Create middleware to be able to handle HTTP pipelining correctly
This commit is contained in:
parent
1c3e4124f8
commit
aa705dd691
8 changed files with 179 additions and 73 deletions
2
Gopkg.lock
generated
2
Gopkg.lock
generated
|
@ -1217,7 +1217,7 @@
|
||||||
"roundrobin",
|
"roundrobin",
|
||||||
"utils"
|
"utils"
|
||||||
]
|
]
|
||||||
revision = "d5b73186eed4aa34b52748699ad19e90f61d4059"
|
revision = "c2414f4542f085363f490048da2fbec5e4537eb6"
|
||||||
|
|
||||||
[[projects]]
|
[[projects]]
|
||||||
name = "github.com/vulcand/predicate"
|
name = "github.com/vulcand/predicate"
|
||||||
|
|
62
middlewares/pipelining/pipelining.go
Normal file
62
middlewares/pipelining/pipelining.go
Normal file
|
@ -0,0 +1,62 @@
|
||||||
|
package pipelining
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Pipelining returns a middleware
|
||||||
|
type Pipelining struct {
|
||||||
|
next http.Handler
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPipelining returns a new Pipelining instance
|
||||||
|
func NewPipelining(next http.Handler) *Pipelining {
|
||||||
|
return &Pipelining{
|
||||||
|
next: next,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pipelining) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
// https://github.com/golang/go/blob/3d59583836630cf13ec4bfbed977d27b1b7adbdc/src/net/http/server.go#L201-L218
|
||||||
|
if r.Method == http.MethodPut || r.Method == http.MethodPost {
|
||||||
|
p.next.ServeHTTP(rw, r)
|
||||||
|
} else {
|
||||||
|
p.next.ServeHTTP(&writerWithoutCloseNotify{rw}, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// writerWithoutCloseNotify helps to disable closeNotify
|
||||||
|
type writerWithoutCloseNotify struct {
|
||||||
|
W http.ResponseWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
// Header returns the response headers.
|
||||||
|
func (w *writerWithoutCloseNotify) Header() http.Header {
|
||||||
|
return w.W.Header()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write writes the data to the connection as part of an HTTP reply.
|
||||||
|
func (w *writerWithoutCloseNotify) Write(buf []byte) (int, error) {
|
||||||
|
return w.W.Write(buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteHeader sends an HTTP response header with the provided
|
||||||
|
// status code.
|
||||||
|
func (w *writerWithoutCloseNotify) WriteHeader(code int) {
|
||||||
|
w.W.WriteHeader(code)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush sends any buffered data to the client.
|
||||||
|
func (w *writerWithoutCloseNotify) Flush() {
|
||||||
|
if f, ok := w.W.(http.Flusher); ok {
|
||||||
|
f.Flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hijack hijacks the connection.
|
||||||
|
func (w *writerWithoutCloseNotify) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
||||||
|
return w.W.(http.Hijacker).Hijack()
|
||||||
|
}
|
69
middlewares/pipelining/pipelining_test.go
Normal file
69
middlewares/pipelining/pipelining_test.go
Normal file
|
@ -0,0 +1,69 @@
|
||||||
|
package pipelining
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
type recorderWithCloseNotify struct {
|
||||||
|
*httptest.ResponseRecorder
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *recorderWithCloseNotify) CloseNotify() <-chan bool {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewPipelining(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
desc string
|
||||||
|
HTTPMethod string
|
||||||
|
implementCloseNotifier bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "should not implement CloseNotifier with GET method",
|
||||||
|
HTTPMethod: http.MethodGet,
|
||||||
|
implementCloseNotifier: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "should implement CloseNotifier with PUT method",
|
||||||
|
HTTPMethod: http.MethodPut,
|
||||||
|
implementCloseNotifier: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "should implement CloseNotifier with POST method",
|
||||||
|
HTTPMethod: http.MethodPost,
|
||||||
|
implementCloseNotifier: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "should not implement CloseNotifier with GET method",
|
||||||
|
HTTPMethod: http.MethodHead,
|
||||||
|
implementCloseNotifier: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "should not implement CloseNotifier with PROPFIND method",
|
||||||
|
HTTPMethod: "PROPFIND",
|
||||||
|
implementCloseNotifier: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range testCases {
|
||||||
|
test := test
|
||||||
|
t.Run(test.desc, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
nextHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
_, ok := w.(http.CloseNotifier)
|
||||||
|
assert.Equal(t, test.implementCloseNotifier, ok)
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
})
|
||||||
|
handler := NewPipelining(nextHandler)
|
||||||
|
|
||||||
|
req := httptest.NewRequest(test.HTTPMethod, "http://localhost", nil)
|
||||||
|
|
||||||
|
handler.ServeHTTP(&recorderWithCloseNotify{httptest.NewRecorder()}, req)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -32,6 +32,7 @@ import (
|
||||||
"github.com/containous/traefik/middlewares/accesslog"
|
"github.com/containous/traefik/middlewares/accesslog"
|
||||||
mauth "github.com/containous/traefik/middlewares/auth"
|
mauth "github.com/containous/traefik/middlewares/auth"
|
||||||
"github.com/containous/traefik/middlewares/errorpages"
|
"github.com/containous/traefik/middlewares/errorpages"
|
||||||
|
"github.com/containous/traefik/middlewares/pipelining"
|
||||||
"github.com/containous/traefik/middlewares/redirect"
|
"github.com/containous/traefik/middlewares/redirect"
|
||||||
"github.com/containous/traefik/middlewares/tracing"
|
"github.com/containous/traefik/middlewares/tracing"
|
||||||
"github.com/containous/traefik/provider"
|
"github.com/containous/traefik/provider"
|
||||||
|
@ -1023,6 +1024,8 @@ func (s *Server) loadConfig(configurations types.Configurations, globalConfigura
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fwd = pipelining.NewPipelining(fwd)
|
||||||
|
|
||||||
var rr *roundrobin.RoundRobin
|
var rr *roundrobin.RoundRobin
|
||||||
var saveFrontend http.Handler
|
var saveFrontend http.Handler
|
||||||
if s.accessLoggerMiddleware != nil {
|
if s.accessLoggerMiddleware != nil {
|
||||||
|
|
2
vendor/github.com/vulcand/oxy/cbreaker/cbreaker.go
generated
vendored
2
vendor/github.com/vulcand/oxy/cbreaker/cbreaker.go
generated
vendored
|
@ -156,7 +156,7 @@ func (c *CircuitBreaker) activateFallback(w http.ResponseWriter, req *http.Reque
|
||||||
|
|
||||||
func (c *CircuitBreaker) serve(w http.ResponseWriter, req *http.Request) {
|
func (c *CircuitBreaker) serve(w http.ResponseWriter, req *http.Request) {
|
||||||
start := c.clock.UtcNow()
|
start := c.clock.UtcNow()
|
||||||
p := utils.NewSimpleProxyWriter(w)
|
p := utils.NewProxyWriter(w)
|
||||||
|
|
||||||
c.next.ServeHTTP(p, req)
|
c.next.ServeHTTP(p, req)
|
||||||
|
|
||||||
|
|
16
vendor/github.com/vulcand/oxy/forward/fwd.go
generated
vendored
16
vendor/github.com/vulcand/oxy/forward/fwd.go
generated
vendored
|
@ -466,16 +466,6 @@ func (f *httpForwarder) serveHTTP(w http.ResponseWriter, inReq *http.Request, ct
|
||||||
defer logEntry.Debug("vulcand/oxy/forward/http: completed ServeHttp on request")
|
defer logEntry.Debug("vulcand/oxy/forward/http: completed ServeHttp on request")
|
||||||
}
|
}
|
||||||
|
|
||||||
var pw utils.ProxyWriter
|
|
||||||
|
|
||||||
// Disable closeNotify when method GET for http pipelining
|
|
||||||
// Waiting for https://github.com/golang/go/issues/23921
|
|
||||||
if inReq.Method == http.MethodGet {
|
|
||||||
pw = utils.NewProxyWriterWithoutCloseNotify(w)
|
|
||||||
} else {
|
|
||||||
pw = utils.NewSimpleProxyWriter(w)
|
|
||||||
}
|
|
||||||
|
|
||||||
start := time.Now().UTC()
|
start := time.Now().UTC()
|
||||||
|
|
||||||
outReq := new(http.Request)
|
outReq := new(http.Request)
|
||||||
|
@ -490,6 +480,9 @@ func (f *httpForwarder) serveHTTP(w http.ResponseWriter, inReq *http.Request, ct
|
||||||
ModifyResponse: f.modifyResponse,
|
ModifyResponse: f.modifyResponse,
|
||||||
BufferPool: f.bufferPool,
|
BufferPool: f.bufferPool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if f.log.GetLevel() >= log.DebugLevel {
|
||||||
|
pw := utils.NewProxyWriter(w)
|
||||||
revproxy.ServeHTTP(pw, outReq)
|
revproxy.ServeHTTP(pw, outReq)
|
||||||
|
|
||||||
if inReq.TLS != nil {
|
if inReq.TLS != nil {
|
||||||
|
@ -503,6 +496,9 @@ func (f *httpForwarder) serveHTTP(w http.ResponseWriter, inReq *http.Request, ct
|
||||||
f.log.Debugf("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v",
|
f.log.Debugf("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v",
|
||||||
inReq.URL, pw.StatusCode(), pw.GetLength(), time.Now().UTC().Sub(start))
|
inReq.URL, pw.StatusCode(), pw.GetLength(), time.Now().UTC().Sub(start))
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
revproxy.ServeHTTP(w, outReq)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// isWebsocketRequest determines if the specified HTTP request is a
|
// isWebsocketRequest determines if the specified HTTP request is a
|
||||||
|
|
2
vendor/github.com/vulcand/oxy/roundrobin/rebalancer.go
generated
vendored
2
vendor/github.com/vulcand/oxy/roundrobin/rebalancer.go
generated
vendored
|
@ -148,7 +148,7 @@ func (rb *Rebalancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||||
defer logEntry.Debug("vulcand/oxy/roundrobin/rebalancer: completed ServeHttp on request")
|
defer logEntry.Debug("vulcand/oxy/roundrobin/rebalancer: completed ServeHttp on request")
|
||||||
}
|
}
|
||||||
|
|
||||||
pw := utils.NewSimpleProxyWriter(w)
|
pw := utils.NewProxyWriter(w)
|
||||||
start := rb.clock.UtcNow()
|
start := rb.clock.UtcNow()
|
||||||
|
|
||||||
// make shallow copy of request before changing anything to avoid side effects
|
// make shallow copy of request before changing anything to avoid side effects
|
||||||
|
|
76
vendor/github.com/vulcand/oxy/utils/netutils.go
generated
vendored
76
vendor/github.com/vulcand/oxy/utils/netutils.go
generated
vendored
|
@ -12,89 +12,65 @@ import (
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ProxyWriter interface {
|
type ProxyWriter struct {
|
||||||
http.ResponseWriter
|
|
||||||
GetLength() int64
|
|
||||||
StatusCode() int
|
|
||||||
GetWriter() http.ResponseWriter
|
|
||||||
}
|
|
||||||
|
|
||||||
// ProxyWriterWithoutCloseNotify helps to capture response headers and status code
|
|
||||||
// from the ServeHTTP. It can be safely passed to ServeHTTP handler,
|
|
||||||
// wrapping the real response writer.
|
|
||||||
type ProxyWriterWithoutCloseNotify struct {
|
|
||||||
W http.ResponseWriter
|
W http.ResponseWriter
|
||||||
Code int
|
code int
|
||||||
Length int64
|
length int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProxyWriterWithoutCloseNotify(writer http.ResponseWriter) *ProxyWriterWithoutCloseNotify {
|
func NewProxyWriter(writer http.ResponseWriter) *ProxyWriter {
|
||||||
return &ProxyWriterWithoutCloseNotify{
|
return &ProxyWriter{
|
||||||
W: writer,
|
W: writer,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSimpleProxyWriter(writer http.ResponseWriter) *SimpleProxyWriter {
|
func (p *ProxyWriter) StatusCode() int {
|
||||||
return &SimpleProxyWriter{
|
if p.code == 0 {
|
||||||
ProxyWriterWithoutCloseNotify: NewProxyWriterWithoutCloseNotify(writer),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type SimpleProxyWriter struct {
|
|
||||||
*ProxyWriterWithoutCloseNotify
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *ProxyWriterWithoutCloseNotify) GetWriter() http.ResponseWriter {
|
|
||||||
return p.W
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *ProxyWriterWithoutCloseNotify) StatusCode() int {
|
|
||||||
if p.Code == 0 {
|
|
||||||
// per contract standard lib will set this to http.StatusOK if not set
|
// per contract standard lib will set this to http.StatusOK if not set
|
||||||
// by user, here we avoid the confusion by mirroring this logic
|
// by user, here we avoid the confusion by mirroring this logic
|
||||||
return http.StatusOK
|
return http.StatusOK
|
||||||
}
|
}
|
||||||
return p.Code
|
return p.code
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProxyWriterWithoutCloseNotify) Header() http.Header {
|
func (p *ProxyWriter) GetLength() int64 {
|
||||||
|
return p.length
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ProxyWriter) Header() http.Header {
|
||||||
return p.W.Header()
|
return p.W.Header()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProxyWriterWithoutCloseNotify) Write(buf []byte) (int, error) {
|
func (p *ProxyWriter) Write(buf []byte) (int, error) {
|
||||||
p.Length = p.Length + int64(len(buf))
|
p.length = p.length + int64(len(buf))
|
||||||
return p.W.Write(buf)
|
return p.W.Write(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProxyWriterWithoutCloseNotify) WriteHeader(code int) {
|
func (p *ProxyWriter) WriteHeader(code int) {
|
||||||
p.Code = code
|
p.code = code
|
||||||
p.W.WriteHeader(code)
|
p.W.WriteHeader(code)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProxyWriterWithoutCloseNotify) Flush() {
|
func (p *ProxyWriter) Flush() {
|
||||||
if f, ok := p.W.(http.Flusher); ok {
|
if f, ok := p.W.(http.Flusher); ok {
|
||||||
f.Flush()
|
f.Flush()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProxyWriterWithoutCloseNotify) GetLength() int64 {
|
func (p *ProxyWriter) CloseNotify() <-chan bool {
|
||||||
return p.Length
|
if cn, ok := p.W.(http.CloseNotifier); ok {
|
||||||
}
|
|
||||||
|
|
||||||
func (p *SimpleProxyWriter) CloseNotify() <-chan bool {
|
|
||||||
if cn, ok := p.GetWriter().(http.CloseNotifier); ok {
|
|
||||||
return cn.CloseNotify()
|
return cn.CloseNotify()
|
||||||
}
|
}
|
||||||
log.Warningf("Upstream ResponseWriter of type %v does not implement http.CloseNotifier. Returning dummy channel.", reflect.TypeOf(p.GetWriter()))
|
log.Debugf("Upstream ResponseWriter of type %v does not implement http.CloseNotifier. Returning dummy channel.", reflect.TypeOf(p.W))
|
||||||
return make(<-chan bool)
|
return make(<-chan bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ProxyWriterWithoutCloseNotify) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
func (p *ProxyWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
||||||
if hi, ok := p.W.(http.Hijacker); ok {
|
if hi, ok := p.W.(http.Hijacker); ok {
|
||||||
return hi.Hijack()
|
return hi.Hijack()
|
||||||
}
|
}
|
||||||
log.Warningf("Upstream ResponseWriter of type %v does not implement http.Hijacker. Returning dummy channel.", reflect.TypeOf(p.W))
|
log.Debugf("Upstream ResponseWriter of type %v does not implement http.Hijacker. Returning dummy channel.", reflect.TypeOf(p.W))
|
||||||
return nil, nil, fmt.Errorf("The response writer that was wrapped in this proxy, does not implement http.Hijacker. It is of type: %v", reflect.TypeOf(p.W))
|
return nil, nil, fmt.Errorf("the response writer that was wrapped in this proxy, does not implement http.Hijacker. It is of type: %v", reflect.TypeOf(p.W))
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBufferWriter(w io.WriteCloser) *BufferWriter {
|
func NewBufferWriter(w io.WriteCloser) *BufferWriter {
|
||||||
|
@ -139,8 +115,8 @@ func (b *BufferWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
||||||
if hi, ok := b.W.(http.Hijacker); ok {
|
if hi, ok := b.W.(http.Hijacker); ok {
|
||||||
return hi.Hijack()
|
return hi.Hijack()
|
||||||
}
|
}
|
||||||
log.Warningf("Upstream ResponseWriter of type %v does not implement http.Hijacker. Returning dummy channel.", reflect.TypeOf(b.W))
|
log.Debugf("Upstream ResponseWriter of type %v does not implement http.Hijacker. Returning dummy channel.", reflect.TypeOf(b.W))
|
||||||
return nil, nil, fmt.Errorf("The response writer that was wrapped in this proxy, does not implement http.Hijacker. It is of type: %v", reflect.TypeOf(b.W))
|
return nil, nil, fmt.Errorf("the response writer that was wrapped in this proxy, does not implement http.Hijacker. It is of type: %v", reflect.TypeOf(b.W))
|
||||||
}
|
}
|
||||||
|
|
||||||
type nopWriteCloser struct {
|
type nopWriteCloser struct {
|
||||||
|
|
Loading…
Add table
Reference in a new issue