2018-01-31 15:32:04 +01:00
/ *
Package buffer provides http . Handler middleware that solves several problems when dealing with http requests :
Reads the entire request and response into buffer , optionally buffering it to disk for large requests .
Checks the limits for the requests and responses , rejecting in case if the limit was exceeded .
Changes request content - transfer - encoding from chunked and provides total size to the handlers .
Examples of a buffering middleware :
// sample HTTP handler
handler := http . HandlerFunc ( func ( w http . ResponseWriter , req * http . Request ) {
w . Write ( [ ] byte ( "hello" ) )
} )
// Buffer will read the body in buffer before passing the request to the handler
// calculate total size of the request and transform it from chunked encoding
// before passing to the server
buffer . New ( handler )
// This version will buffer up to 2MB in memory and will serialize any extra
// to a temporary file, if the request size exceeds 10MB it will reject the request
buffer . New ( handler ,
buffer . MemRequestBodyBytes ( 2 * 1024 * 1024 ) ,
buffer . MaxRequestBodyBytes ( 10 * 1024 * 1024 ) )
// Will do the same as above, but with responses
buffer . New ( handler ,
buffer . MemResponseBodyBytes ( 2 * 1024 * 1024 ) ,
buffer . MaxResponseBodyBytes ( 10 * 1024 * 1024 ) )
// Buffer will replay the request if the handler returns error at least 3 times
// before returning the response
buffer . New ( handler , buffer . Retry ( ` IsNetworkError() && Attempts() <= 2 ` ) )
* /
package buffer
import (
2018-08-20 10:38:03 +02:00
"bufio"
2018-01-31 15:32:04 +01:00
"fmt"
"io"
"io/ioutil"
"net"
2018-08-20 10:38:03 +02:00
"net/http"
2018-01-31 15:32:04 +01:00
"reflect"
"github.com/mailgun/multibuf"
log "github.com/sirupsen/logrus"
"github.com/vulcand/oxy/utils"
)
const (
// DefaultMemBodyBytes Store up to 1MB in RAM
DefaultMemBodyBytes = 1048576
// DefaultMaxBodyBytes No limit by default
DefaultMaxBodyBytes = - 1
// DefaultMaxRetryAttempts Maximum retry attempts
DefaultMaxRetryAttempts = 10
)
var errHandler utils . ErrorHandler = & SizeErrHandler { }
// Buffer is responsible for buffering requests and responses
// It buffers large requests and responses to disk,
type Buffer struct {
maxRequestBodyBytes int64
memRequestBodyBytes int64
maxResponseBodyBytes int64
memResponseBodyBytes int64
retryPredicate hpredicate
next http . Handler
errHandler utils . ErrorHandler
2018-08-20 10:38:03 +02:00
log * log . Logger
2018-01-31 15:32:04 +01:00
}
// New returns a new buffer middleware. New() function supports optional functional arguments
func New ( next http . Handler , setters ... optSetter ) ( * Buffer , error ) {
strm := & Buffer {
next : next ,
maxRequestBodyBytes : DefaultMaxBodyBytes ,
memRequestBodyBytes : DefaultMemBodyBytes ,
maxResponseBodyBytes : DefaultMaxBodyBytes ,
memResponseBodyBytes : DefaultMemBodyBytes ,
2018-08-20 10:38:03 +02:00
log : log . StandardLogger ( ) ,
2018-01-31 15:32:04 +01:00
}
for _ , s := range setters {
if err := s ( strm ) ; err != nil {
return nil , err
}
}
if strm . errHandler == nil {
strm . errHandler = errHandler
}
return strm , nil
}
2018-08-20 10:38:03 +02:00
// Logger defines the logger the buffer will use.
//
// It defaults to logrus.StandardLogger(), the global logger used by logrus.
func Logger ( l * log . Logger ) optSetter {
return func ( b * Buffer ) error {
b . log = l
return nil
}
}
2018-02-12 17:24:03 +01:00
type optSetter func ( b * Buffer ) error
// CondSetter Conditional setter.
// ex: Cond(a > 4, MemRequestBodyBytes(a))
func CondSetter ( condition bool , setter optSetter ) optSetter {
2018-02-15 16:06:03 +01:00
if ! condition {
2018-02-12 17:24:03 +01:00
// NoOp setter
return func ( * Buffer ) error {
return nil
}
}
return setter
}
2018-01-31 15:32:04 +01:00
// Retry provides a predicate that allows buffer middleware to replay the request
// if it matches certain condition, e.g. returns special error code. Available functions are:
//
// Attempts() - limits the amount of retry attempts
// ResponseCode() - returns http response code
// IsNetworkError() - tests if response code is related to networking error
//
// Example of the predicate:
//
// `Attempts() <= 2 && ResponseCode() == 502`
//
func Retry ( predicate string ) optSetter {
2018-02-12 17:24:03 +01:00
return func ( b * Buffer ) error {
2018-01-31 15:32:04 +01:00
p , err := parseExpression ( predicate )
if err != nil {
return err
}
2018-02-12 17:24:03 +01:00
b . retryPredicate = p
2018-01-31 15:32:04 +01:00
return nil
}
}
// ErrorHandler sets error handler of the server
func ErrorHandler ( h utils . ErrorHandler ) optSetter {
2018-02-12 17:24:03 +01:00
return func ( b * Buffer ) error {
b . errHandler = h
2018-01-31 15:32:04 +01:00
return nil
}
}
// MaxRequestBodyBytes sets the maximum request body size in bytes
func MaxRequestBodyBytes ( m int64 ) optSetter {
2018-02-12 17:24:03 +01:00
return func ( b * Buffer ) error {
2018-01-31 15:32:04 +01:00
if m < 0 {
return fmt . Errorf ( "max bytes should be >= 0 got %d" , m )
}
2018-02-12 17:24:03 +01:00
b . maxRequestBodyBytes = m
2018-01-31 15:32:04 +01:00
return nil
}
}
2018-08-20 10:38:03 +02:00
// MemRequestBodyBytes bytes sets the maximum request body to be stored in memory
2018-01-31 15:32:04 +01:00
// buffer middleware will serialize the excess to disk.
func MemRequestBodyBytes ( m int64 ) optSetter {
2018-02-12 17:24:03 +01:00
return func ( b * Buffer ) error {
2018-01-31 15:32:04 +01:00
if m < 0 {
return fmt . Errorf ( "mem bytes should be >= 0 got %d" , m )
}
2018-02-12 17:24:03 +01:00
b . memRequestBodyBytes = m
2018-01-31 15:32:04 +01:00
return nil
}
}
// MaxResponseBodyBytes sets the maximum request body size in bytes
func MaxResponseBodyBytes ( m int64 ) optSetter {
2018-02-12 17:24:03 +01:00
return func ( b * Buffer ) error {
2018-01-31 15:32:04 +01:00
if m < 0 {
return fmt . Errorf ( "max bytes should be >= 0 got %d" , m )
}
2018-02-12 17:24:03 +01:00
b . maxResponseBodyBytes = m
2018-01-31 15:32:04 +01:00
return nil
}
}
// MemResponseBodyBytes sets the maximum request body to be stored in memory
// buffer middleware will serialize the excess to disk.
func MemResponseBodyBytes ( m int64 ) optSetter {
2018-02-12 17:24:03 +01:00
return func ( b * Buffer ) error {
2018-01-31 15:32:04 +01:00
if m < 0 {
return fmt . Errorf ( "mem bytes should be >= 0 got %d" , m )
}
2018-02-12 17:24:03 +01:00
b . memResponseBodyBytes = m
2018-01-31 15:32:04 +01:00
return nil
}
}
// Wrap sets the next handler to be called by buffer handler.
2018-02-12 17:24:03 +01:00
func ( b * Buffer ) Wrap ( next http . Handler ) error {
b . next = next
2018-01-31 15:32:04 +01:00
return nil
}
2018-02-12 17:24:03 +01:00
func ( b * Buffer ) ServeHTTP ( w http . ResponseWriter , req * http . Request ) {
2018-08-20 10:38:03 +02:00
if b . log . Level >= log . DebugLevel {
logEntry := b . log . WithField ( "Request" , utils . DumpHttpRequest ( req ) )
2018-01-31 15:32:04 +01:00
logEntry . Debug ( "vulcand/oxy/buffer: begin ServeHttp on request" )
2018-06-04 14:14:03 +02:00
defer logEntry . Debug ( "vulcand/oxy/buffer: completed ServeHttp on request" )
2018-01-31 15:32:04 +01:00
}
2018-02-12 17:24:03 +01:00
if err := b . checkLimit ( req ) ; err != nil {
2018-01-31 15:32:04 +01:00
log . Errorf ( "vulcand/oxy/buffer: request body over limit, err: %v" , err )
2018-02-12 17:24:03 +01:00
b . errHandler . ServeHTTP ( w , req , err )
2018-01-31 15:32:04 +01:00
return
}
// Read the body while keeping limits in mind. This reader controls the maximum bytes
// to read into memory and disk. This reader returns an error if the total request size exceeds the
2018-08-20 10:38:03 +02:00
// predefined MaxSizeBytes. This can occur if we got chunked request, in this case ContentLength would be set to -1
2018-01-31 15:32:04 +01:00
// and the reader would be unbounded bufio in the http.Server
2018-02-12 17:24:03 +01:00
body , err := multibuf . New ( req . Body , multibuf . MaxBytes ( b . maxRequestBodyBytes ) , multibuf . MemBytes ( b . memRequestBodyBytes ) )
2018-01-31 15:32:04 +01:00
if err != nil || body == nil {
2018-08-20 10:38:03 +02:00
b . log . Errorf ( "vulcand/oxy/buffer: error when reading request body, err: %v" , err )
2018-02-12 17:24:03 +01:00
b . errHandler . ServeHTTP ( w , req , err )
2018-01-31 15:32:04 +01:00
return
}
// Set request body to buffered reader that can replay the read and execute Seek
// Note that we don't change the original request body as it's handled by the http server
// and we don'w want to mess with standard library
defer func ( ) {
if body != nil {
errClose := body . Close ( )
if errClose != nil {
log . Errorf ( "vulcand/oxy/buffer: failed to close body, err: %v" , errClose )
}
}
} ( )
// We need to set ContentLength based on known request size. The incoming request may have been
// set without content length or using chunked TransferEncoding
totalSize , err := body . Size ( )
if err != nil {
2018-08-20 10:38:03 +02:00
b . log . Errorf ( "vulcand/oxy/buffer: failed to get request size, err: %v" , err )
2018-02-12 17:24:03 +01:00
b . errHandler . ServeHTTP ( w , req , err )
2018-01-31 15:32:04 +01:00
return
}
if totalSize == 0 {
body = nil
}
2018-02-12 17:24:03 +01:00
outreq := b . copyRequest ( req , body , totalSize )
2018-01-31 15:32:04 +01:00
attempt := 1
for {
// We create a special writer that will limit the response size, buffer it to disk if necessary
2018-02-12 17:24:03 +01:00
writer , err := multibuf . NewWriterOnce ( multibuf . MaxBytes ( b . maxResponseBodyBytes ) , multibuf . MemBytes ( b . memResponseBodyBytes ) )
2018-01-31 15:32:04 +01:00
if err != nil {
2018-08-20 10:38:03 +02:00
b . log . Errorf ( "vulcand/oxy/buffer: failed create response writer, err: %v" , err )
2018-02-12 17:24:03 +01:00
b . errHandler . ServeHTTP ( w , req , err )
2018-01-31 15:32:04 +01:00
return
}
// We are mimicking http.ResponseWriter to replace writer with our special writer
2018-02-12 17:24:03 +01:00
bw := & bufferWriter {
2018-01-31 15:32:04 +01:00
header : make ( http . Header ) ,
buffer : writer ,
responseWriter : w ,
2018-08-20 10:38:03 +02:00
log : b . log ,
2018-01-31 15:32:04 +01:00
}
2018-02-12 17:24:03 +01:00
defer bw . Close ( )
2018-01-31 15:32:04 +01:00
2018-02-12 17:24:03 +01:00
b . next . ServeHTTP ( bw , outreq )
if bw . hijacked {
2018-08-20 10:38:03 +02:00
b . log . Debugf ( "vulcand/oxy/buffer: connection was hijacked downstream. Not taking any action in buffer." )
2018-01-31 15:32:04 +01:00
return
}
var reader multibuf . MultiReader
2018-02-12 17:24:03 +01:00
if bw . expectBody ( outreq ) {
2018-01-31 15:32:04 +01:00
rdr , err := writer . Reader ( )
if err != nil {
2018-08-20 10:38:03 +02:00
b . log . Errorf ( "vulcand/oxy/buffer: failed to read response, err: %v" , err )
2018-02-12 17:24:03 +01:00
b . errHandler . ServeHTTP ( w , req , err )
2018-01-31 15:32:04 +01:00
return
}
defer rdr . Close ( )
reader = rdr
}
2018-02-12 17:24:03 +01:00
if ( b . retryPredicate == nil || attempt > DefaultMaxRetryAttempts ) ||
! b . retryPredicate ( & context { r : req , attempt : attempt , responseCode : bw . code } ) {
utils . CopyHeaders ( w . Header ( ) , bw . Header ( ) )
w . WriteHeader ( bw . code )
2018-01-31 15:32:04 +01:00
if reader != nil {
io . Copy ( w , reader )
}
return
}
2018-08-20 10:38:03 +02:00
attempt ++
2018-01-31 15:32:04 +01:00
if body != nil {
if _ , err := body . Seek ( 0 , 0 ) ; err != nil {
2018-08-20 10:38:03 +02:00
b . log . Errorf ( "vulcand/oxy/buffer: failed to rewind response body, err: %v" , err )
2018-02-12 17:24:03 +01:00
b . errHandler . ServeHTTP ( w , req , err )
2018-01-31 15:32:04 +01:00
return
}
}
2018-02-12 17:24:03 +01:00
outreq = b . copyRequest ( req , body , totalSize )
2018-08-20 10:38:03 +02:00
b . log . Debugf ( "vulcand/oxy/buffer: retry Request(%v %v) attempt %v" , req . Method , req . URL , attempt )
2018-01-31 15:32:04 +01:00
}
}
2018-02-12 17:24:03 +01:00
func ( b * Buffer ) copyRequest ( req * http . Request , body io . ReadCloser , bodySize int64 ) * http . Request {
2018-01-31 15:32:04 +01:00
o := * req
o . URL = utils . CopyURL ( req . URL )
o . Header = make ( http . Header )
utils . CopyHeaders ( o . Header , req . Header )
o . ContentLength = bodySize
// remove TransferEncoding that could have been previously set because we have transformed the request from chunked encoding
o . TransferEncoding = [ ] string { }
// http.Transport will close the request body on any error, we are controlling the close process ourselves, so we override the closer here
if body == nil {
o . Body = nil
} else {
o . Body = ioutil . NopCloser ( body . ( io . Reader ) )
}
return & o
}
2018-02-12 17:24:03 +01:00
func ( b * Buffer ) checkLimit ( req * http . Request ) error {
if b . maxRequestBodyBytes <= 0 {
2018-01-31 15:32:04 +01:00
return nil
}
2018-02-12 17:24:03 +01:00
if req . ContentLength > b . maxRequestBodyBytes {
return & multibuf . MaxSizeReachedError { MaxSize : b . maxRequestBodyBytes }
2018-01-31 15:32:04 +01:00
}
return nil
}
type bufferWriter struct {
header http . Header
code int
buffer multibuf . WriterOnce
responseWriter http . ResponseWriter
hijacked bool
2018-08-20 10:38:03 +02:00
log * log . Logger
2018-01-31 15:32:04 +01:00
}
// RFC2616 #4.4
func ( b * bufferWriter ) expectBody ( r * http . Request ) bool {
if r . Method == "HEAD" {
return false
}
if ( b . code >= 100 && b . code < 200 ) || b . code == 204 || b . code == 304 {
return false
}
2018-04-10 17:24:04 +02:00
// refer to https://github.com/vulcand/oxy/issues/113
// if b.header.Get("Content-Length") == "" && b.header.Get("Transfer-Encoding") == "" {
// return false
// }
2018-01-31 15:32:04 +01:00
if b . header . Get ( "Content-Length" ) == "0" {
return false
}
return true
}
func ( b * bufferWriter ) Close ( ) error {
return b . buffer . Close ( )
}
func ( b * bufferWriter ) Header ( ) http . Header {
return b . header
}
func ( b * bufferWriter ) Write ( buf [ ] byte ) ( int , error ) {
return b . buffer . Write ( buf )
}
// WriteHeader sets rw.Code.
func ( b * bufferWriter ) WriteHeader ( code int ) {
b . code = code
}
2018-08-20 10:38:03 +02:00
// CloseNotifier interface - this allows downstream connections to be terminated when the client terminates.
2018-01-31 15:32:04 +01:00
func ( b * bufferWriter ) CloseNotify ( ) <- chan bool {
if cn , ok := b . responseWriter . ( http . CloseNotifier ) ; ok {
return cn . CloseNotify ( )
}
2018-08-20 10:38:03 +02:00
b . log . Warningf ( "Upstream ResponseWriter of type %v does not implement http.CloseNotifier. Returning dummy channel." , reflect . TypeOf ( b . responseWriter ) )
2018-01-31 15:32:04 +01:00
return make ( <- chan bool )
}
2018-08-20 10:38:03 +02:00
// Hijack This allows connections to be hijacked for websockets for instance.
2018-01-31 15:32:04 +01:00
func ( b * bufferWriter ) Hijack ( ) ( net . Conn , * bufio . ReadWriter , error ) {
if hi , ok := b . responseWriter . ( http . Hijacker ) ; ok {
conn , rw , err := hi . Hijack ( )
if err != nil {
b . hijacked = true
}
return conn , rw , err
}
2018-08-20 10:38:03 +02:00
b . log . Warningf ( "Upstream ResponseWriter of type %v does not implement http.Hijacker. Returning dummy channel." , reflect . TypeOf ( b . responseWriter ) )
2018-01-31 15:32:04 +01:00
return nil , nil , fmt . Errorf ( "The response writer that was wrapped in this proxy, does not implement http.Hijacker. It is of type: %v" , reflect . TypeOf ( b . responseWriter ) )
}
2018-08-20 10:38:03 +02:00
// SizeErrHandler Size error handler
type SizeErrHandler struct { }
2018-01-31 15:32:04 +01:00
func ( e * SizeErrHandler ) ServeHTTP ( w http . ResponseWriter , req * http . Request , err error ) {
if _ , ok := err . ( * multibuf . MaxSizeReachedError ) ; ok {
w . WriteHeader ( http . StatusRequestEntityTooLarge )
w . Write ( [ ] byte ( http . StatusText ( http . StatusRequestEntityTooLarge ) ) )
return
}
utils . DefaultHandler . ServeHTTP ( w , req , err )
}