Disable closeNotify when method GET for http pipelining

This commit is contained in:
SALLEYRON Julien 2018-04-10 17:24:04 +02:00 committed by Traefiker Bot
parent f35d574759
commit 2387010556
7 changed files with 94 additions and 45 deletions

2
Gopkg.lock generated
View file

@ -1215,7 +1215,7 @@
"roundrobin", "roundrobin",
"utils" "utils"
] ]
revision = "dacf34285ce530b272e9fe04d2f45f52e6374e36" revision = "6956548a7fa4272adeadf828455109c53933ea86"
[[projects]] [[projects]]
name = "github.com/vulcand/predicate" name = "github.com/vulcand/predicate"

View file

@ -349,9 +349,10 @@ func (b *bufferWriter) expectBody(r *http.Request) bool {
if (b.code >= 100 && b.code < 200) || b.code == 204 || b.code == 304 { if (b.code >= 100 && b.code < 200) || b.code == 204 || b.code == 304 {
return false return false
} }
if b.header.Get("Content-Length") == "" && b.header.Get("Transfer-Encoding") == "" { // refer to https://github.com/vulcand/oxy/issues/113
return false // if b.header.Get("Content-Length") == "" && b.header.Get("Transfer-Encoding") == "" {
} // return false
// }
if b.header.Get("Content-Length") == "0" { if b.header.Get("Content-Length") == "0" {
return false return false
} }

View file

@ -156,12 +156,12 @@ func (c *CircuitBreaker) activateFallback(w http.ResponseWriter, req *http.Reque
func (c *CircuitBreaker) serve(w http.ResponseWriter, req *http.Request) { func (c *CircuitBreaker) serve(w http.ResponseWriter, req *http.Request) {
start := c.clock.UtcNow() start := c.clock.UtcNow()
p := &utils.ProxyWriter{W: w} p := utils.NewSimpleProxyWriter(w)
c.next.ServeHTTP(p, req) c.next.ServeHTTP(p, req)
latency := c.clock.UtcNow().Sub(start) latency := c.clock.UtcNow().Sub(start)
c.metrics.Record(p.Code, latency) c.metrics.Record(p.StatusCode(), latency)
// Note that this call is less expensive than it looks -- checkCondition only performs the real check // Note that this call is less expensive than it looks -- checkCondition only performs the real check
// periodically. Because of that we can afford to call it here on every single response. // periodically. Because of that we can afford to call it here on every single response.

View file

@ -221,7 +221,9 @@ func New(setters ...optSetter) (*Forwarder, error) {
} }
if f.tlsClientConfig == nil { if f.tlsClientConfig == nil {
f.tlsClientConfig = f.httpForwarder.roundTripper.(*http.Transport).TLSClientConfig if ht, ok := f.httpForwarder.roundTripper.(*http.Transport); ok {
f.tlsClientConfig = ht.TLSClientConfig
}
} }
f.httpForwarder.roundTripper = ErrorHandlingRoundTripper{ f.httpForwarder.roundTripper = ErrorHandlingRoundTripper{
@ -444,9 +446,16 @@ func (f *httpForwarder) serveHTTP(w http.ResponseWriter, inReq *http.Request, ct
defer logEntry.Debug("vulcand/oxy/forward/http: completed ServeHttp on request") defer logEntry.Debug("vulcand/oxy/forward/http: completed ServeHttp on request")
} }
pw := &utils.ProxyWriter{ var pw utils.ProxyWriter
W: w,
// Disable closeNotify when method GET for http pipelining
// Waiting for https://github.com/golang/go/issues/23921
if inReq.Method == http.MethodGet {
pw = utils.NewProxyWriterWithoutCloseNotify(w)
} else {
pw = utils.NewSimpleProxyWriter(w)
} }
start := time.Now().UTC() start := time.Now().UTC()
outReq := new(http.Request) outReq := new(http.Request)
@ -464,14 +473,14 @@ func (f *httpForwarder) serveHTTP(w http.ResponseWriter, inReq *http.Request, ct
if inReq.TLS != nil { if inReq.TLS != nil {
f.log.Debugf("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v tls:version: %x, tls:resume:%t, tls:csuite:%x, tls:server:%v", f.log.Debugf("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v tls:version: %x, tls:resume:%t, tls:csuite:%x, tls:server:%v",
inReq.URL, pw.Code, pw.Length, time.Now().UTC().Sub(start), inReq.URL, pw.StatusCode(), pw.GetLength(), time.Now().UTC().Sub(start),
inReq.TLS.Version, inReq.TLS.Version,
inReq.TLS.DidResume, inReq.TLS.DidResume,
inReq.TLS.CipherSuite, inReq.TLS.CipherSuite,
inReq.TLS.ServerName) inReq.TLS.ServerName)
} else { } else {
f.log.Debugf("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v", f.log.Debugf("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v",
inReq.URL, pw.Code, pw.Length, time.Now().UTC().Sub(start)) inReq.URL, pw.StatusCode(), pw.GetLength(), time.Now().UTC().Sub(start))
} }
} }

View file

@ -148,7 +148,7 @@ func (rb *Rebalancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer logEntry.Debug("vulcand/oxy/roundrobin/rebalancer: competed ServeHttp on request") defer logEntry.Debug("vulcand/oxy/roundrobin/rebalancer: competed ServeHttp on request")
} }
pw := &utils.ProxyWriter{W: w} pw := utils.NewSimpleProxyWriter(w)
start := rb.clock.UtcNow() start := rb.clock.UtcNow()
// make shallow copy of request before changing anything to avoid side effects // make shallow copy of request before changing anything to avoid side effects
@ -194,7 +194,7 @@ func (rb *Rebalancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
rb.next.Next().ServeHTTP(pw, &newReq) rb.next.Next().ServeHTTP(pw, &newReq)
rb.recordMetrics(newReq.URL, pw.Code, rb.clock.UtcNow().Sub(start)) rb.recordMetrics(newReq.URL, pw.StatusCode(), rb.clock.UtcNow().Sub(start))
rb.adjustWeights() rb.adjustWeights()
} }

View file

@ -185,43 +185,43 @@ func (r *RoundRobin) RemoveServer(u *url.URL) error {
return nil return nil
} }
func (rr *RoundRobin) Servers() []*url.URL { func (r *RoundRobin) Servers() []*url.URL {
rr.mutex.Lock() r.mutex.Lock()
defer rr.mutex.Unlock() defer r.mutex.Unlock()
out := make([]*url.URL, len(rr.servers)) out := make([]*url.URL, len(r.servers))
for i, srv := range rr.servers { for i, srv := range r.servers {
out[i] = srv.url out[i] = srv.url
} }
return out return out
} }
func (rr *RoundRobin) ServerWeight(u *url.URL) (int, bool) { func (r *RoundRobin) ServerWeight(u *url.URL) (int, bool) {
rr.mutex.Lock() r.mutex.Lock()
defer rr.mutex.Unlock() defer r.mutex.Unlock()
if s, _ := rr.findServerByURL(u); s != nil { if s, _ := r.findServerByURL(u); s != nil {
return s.weight, true return s.weight, true
} }
return -1, false return -1, false
} }
// In case if server is already present in the load balancer, returns error // In case if server is already present in the load balancer, returns error
func (rr *RoundRobin) UpsertServer(u *url.URL, options ...ServerOption) error { func (r *RoundRobin) UpsertServer(u *url.URL, options ...ServerOption) error {
rr.mutex.Lock() r.mutex.Lock()
defer rr.mutex.Unlock() defer r.mutex.Unlock()
if u == nil { if u == nil {
return fmt.Errorf("server URL can't be nil") return fmt.Errorf("server URL can't be nil")
} }
if s, _ := rr.findServerByURL(u); s != nil { if s, _ := r.findServerByURL(u); s != nil {
for _, o := range options { for _, o := range options {
if err := o(s); err != nil { if err := o(s); err != nil {
return err return err
} }
} }
rr.resetState() r.resetState()
return nil return nil
} }
@ -236,8 +236,8 @@ func (rr *RoundRobin) UpsertServer(u *url.URL, options ...ServerOption) error {
srv.weight = defaultWeight srv.weight = defaultWeight
} }
rr.servers = append(rr.servers, srv) r.servers = append(r.servers, srv)
rr.resetState() r.resetState()
return nil return nil
} }
@ -262,9 +262,9 @@ func (r *RoundRobin) findServerByURL(u *url.URL) (*server, int) {
return nil, -1 return nil, -1
} }
func (rr *RoundRobin) maxWeight() int { func (r *RoundRobin) maxWeight() int {
max := -1 max := -1
for _, s := range rr.servers { for _, s := range r.servers {
if s.weight > max { if s.weight > max {
max = s.weight max = s.weight
} }
@ -272,9 +272,9 @@ func (rr *RoundRobin) maxWeight() int {
return max return max
} }
func (rr *RoundRobin) weightGcd() int { func (r *RoundRobin) weightGcd() int {
divisor := -1 divisor := -1
for _, s := range rr.servers { for _, s := range r.servers {
if divisor == -1 { if divisor == -1 {
divisor = s.weight divisor = s.weight
} else { } else {
@ -304,7 +304,15 @@ type server struct {
weight int weight int
} }
const defaultWeight = 1 var defaultWeight = 1
func SetDefaultWeight(weight int) error {
if weight < 0 {
return fmt.Errorf("default weight should be >= 0")
}
defaultWeight = weight
return nil
}
func sameURL(a, b *url.URL) bool { func sameURL(a, b *url.URL) bool {
return a.Path == b.Path && a.Host == b.Host && a.Scheme == b.Scheme return a.Path == b.Path && a.Host == b.Host && a.Scheme == b.Scheme

View file

@ -12,16 +12,43 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
// ProxyWriter helps to capture response headers and status code type ProxyWriter interface {
http.ResponseWriter
GetLength() int64
StatusCode() int
GetWriter() http.ResponseWriter
}
// ProxyWriterWithoutCloseNotify helps to capture response headers and status code
// from the ServeHTTP. It can be safely passed to ServeHTTP handler, // from the ServeHTTP. It can be safely passed to ServeHTTP handler,
// wrapping the real response writer. // wrapping the real response writer.
type ProxyWriter struct { type ProxyWriterWithoutCloseNotify struct {
W http.ResponseWriter W http.ResponseWriter
Code int Code int
Length int64 Length int64
} }
func (p *ProxyWriter) StatusCode() int { func NewProxyWriterWithoutCloseNotify(writer http.ResponseWriter) *ProxyWriterWithoutCloseNotify {
return &ProxyWriterWithoutCloseNotify{
W: writer,
}
}
func NewSimpleProxyWriter(writer http.ResponseWriter) *SimpleProxyWriter {
return &SimpleProxyWriter{
ProxyWriterWithoutCloseNotify: NewProxyWriterWithoutCloseNotify(writer),
}
}
type SimpleProxyWriter struct {
*ProxyWriterWithoutCloseNotify
}
func (p *ProxyWriterWithoutCloseNotify) GetWriter() http.ResponseWriter {
return p.W
}
func (p *ProxyWriterWithoutCloseNotify) StatusCode() int {
if p.Code == 0 { if p.Code == 0 {
// per contract standard lib will set this to http.StatusOK if not set // per contract standard lib will set this to http.StatusOK if not set
// by user, here we avoid the confusion by mirroring this logic // by user, here we avoid the confusion by mirroring this logic
@ -30,35 +57,39 @@ func (p *ProxyWriter) StatusCode() int {
return p.Code return p.Code
} }
func (p *ProxyWriter) Header() http.Header { func (p *ProxyWriterWithoutCloseNotify) Header() http.Header {
return p.W.Header() return p.W.Header()
} }
func (p *ProxyWriter) Write(buf []byte) (int, error) { func (p *ProxyWriterWithoutCloseNotify) Write(buf []byte) (int, error) {
p.Length = p.Length + int64(len(buf)) p.Length = p.Length + int64(len(buf))
return p.W.Write(buf) return p.W.Write(buf)
} }
func (p *ProxyWriter) WriteHeader(code int) { func (p *ProxyWriterWithoutCloseNotify) WriteHeader(code int) {
p.Code = code p.Code = code
p.W.WriteHeader(code) p.W.WriteHeader(code)
} }
func (p *ProxyWriter) Flush() { func (p *ProxyWriterWithoutCloseNotify) Flush() {
if f, ok := p.W.(http.Flusher); ok { if f, ok := p.W.(http.Flusher); ok {
f.Flush() f.Flush()
} }
} }
func (p *ProxyWriter) CloseNotify() <-chan bool { func (p *ProxyWriterWithoutCloseNotify) GetLength() int64 {
if cn, ok := p.W.(http.CloseNotifier); ok { return p.Length
}
func (p *SimpleProxyWriter) CloseNotify() <-chan bool {
if cn, ok := p.GetWriter().(http.CloseNotifier); ok {
return cn.CloseNotify() return cn.CloseNotify()
} }
log.Warningf("Upstream ResponseWriter of type %v does not implement http.CloseNotifier. Returning dummy channel.", reflect.TypeOf(p.W)) log.Warningf("Upstream ResponseWriter of type %v does not implement http.CloseNotifier. Returning dummy channel.", reflect.TypeOf(p.GetWriter()))
return make(<-chan bool) return make(<-chan bool)
} }
func (p *ProxyWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { func (p *ProxyWriterWithoutCloseNotify) Hijack() (net.Conn, *bufio.ReadWriter, error) {
if hi, ok := p.W.(http.Hijacker); ok { if hi, ok := p.W.(http.Hijacker); ok {
return hi.Hijack() return hi.Hijack()
} }