Logger and Leaks

This commit is contained in:
Ludovic Fernandez 2018-02-12 17:24:03 +01:00 committed by Traefiker Bot
parent 91fa727c74
commit 38a4c80995
11 changed files with 108 additions and 82 deletions

2
Gopkg.lock generated
View file

@ -1148,7 +1148,7 @@
"roundrobin",
"utils"
]
revision = "092a2d70bb8859a9def2b1bb9a61cdc193ee55cc"
revision = "6a7a2a4e0782bf83cfe731d59a219178f2c26be9"
[[projects]]
name = "github.com/vulcand/predicate"

View file

@ -1491,22 +1491,12 @@ func (s *Server) buildBufferingMiddleware(handler http.Handler, config *types.Bu
config.MemRequestBodyBytes, config.MaxRequestBodyBytes, config.MemResponseBodyBytes,
config.MaxResponseBodyBytes, config.RetryExpression)
if len(config.RetryExpression) > 0 {
return buffer.New(
handler,
buffer.MemRequestBodyBytes(config.MemRequestBodyBytes),
buffer.MaxRequestBodyBytes(config.MaxRequestBodyBytes),
buffer.MemResponseBodyBytes(config.MemResponseBodyBytes),
buffer.MaxResponseBodyBytes(config.MaxResponseBodyBytes),
buffer.Retry(config.RetryExpression),
)
}
return buffer.New(
handler,
buffer.MemRequestBodyBytes(config.MemRequestBodyBytes),
buffer.MaxRequestBodyBytes(config.MaxRequestBodyBytes),
buffer.MemResponseBodyBytes(config.MemResponseBodyBytes),
buffer.MaxResponseBodyBytes(config.MaxResponseBodyBytes),
buffer.CondSetter(len(config.RetryExpression) > 0, buffer.Retry(config.RetryExpression)),
)
}

View file

@ -99,7 +99,19 @@ func New(next http.Handler, setters ...optSetter) (*Buffer, error) {
return strm, nil
}
type optSetter func(s *Buffer) error
type optSetter func(b *Buffer) error
// CondSetter Conditional setter.
// ex: Cond(a > 4, MemRequestBodyBytes(a))
func CondSetter(condition bool, setter optSetter) optSetter {
if condition {
// NoOp setter
return func(*Buffer) error {
return nil
}
}
return setter
}
// Retry provides a predicate that allows buffer middleware to replay the request
// if it matches certain condition, e.g. returns special error code. Available functions are:
@ -113,31 +125,31 @@ type optSetter func(s *Buffer) error
// `Attempts() <= 2 && ResponseCode() == 502`
//
func Retry(predicate string) optSetter {
return func(s *Buffer) error {
return func(b *Buffer) error {
p, err := parseExpression(predicate)
if err != nil {
return err
}
s.retryPredicate = p
b.retryPredicate = p
return nil
}
}
// ErrorHandler sets error handler of the server
func ErrorHandler(h utils.ErrorHandler) optSetter {
return func(s *Buffer) error {
s.errHandler = h
return func(b *Buffer) error {
b.errHandler = h
return nil
}
}
// MaxRequestBodyBytes sets the maximum request body size in bytes
func MaxRequestBodyBytes(m int64) optSetter {
return func(s *Buffer) error {
return func(b *Buffer) error {
if m < 0 {
return fmt.Errorf("max bytes should be >= 0 got %d", m)
}
s.maxRequestBodyBytes = m
b.maxRequestBodyBytes = m
return nil
}
}
@ -145,22 +157,22 @@ func MaxRequestBodyBytes(m int64) optSetter {
// MaxRequestBody bytes sets the maximum request body to be stored in memory
// buffer middleware will serialize the excess to disk.
func MemRequestBodyBytes(m int64) optSetter {
return func(s *Buffer) error {
return func(b *Buffer) error {
if m < 0 {
return fmt.Errorf("mem bytes should be >= 0 got %d", m)
}
s.memRequestBodyBytes = m
b.memRequestBodyBytes = m
return nil
}
}
// MaxResponseBodyBytes sets the maximum request body size in bytes
func MaxResponseBodyBytes(m int64) optSetter {
return func(s *Buffer) error {
return func(b *Buffer) error {
if m < 0 {
return fmt.Errorf("max bytes should be >= 0 got %d", m)
}
s.maxResponseBodyBytes = m
b.maxResponseBodyBytes = m
return nil
}
}
@ -168,31 +180,31 @@ func MaxResponseBodyBytes(m int64) optSetter {
// MemResponseBodyBytes sets the maximum request body to be stored in memory
// buffer middleware will serialize the excess to disk.
func MemResponseBodyBytes(m int64) optSetter {
return func(s *Buffer) error {
return func(b *Buffer) error {
if m < 0 {
return fmt.Errorf("mem bytes should be >= 0 got %d", m)
}
s.memResponseBodyBytes = m
b.memResponseBodyBytes = m
return nil
}
}
// Wrap sets the next handler to be called by buffer handler.
func (s *Buffer) Wrap(next http.Handler) error {
s.next = next
func (b *Buffer) Wrap(next http.Handler) error {
b.next = next
return nil
}
func (s *Buffer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
func (b *Buffer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if log.GetLevel() >= log.DebugLevel {
logEntry := log.WithField("Request", utils.DumpHttpRequest(req))
logEntry.Debug("vulcand/oxy/buffer: begin ServeHttp on request")
defer logEntry.Debug("vulcand/oxy/buffer: competed ServeHttp on request")
}
if err := s.checkLimit(req); err != nil {
if err := b.checkLimit(req); err != nil {
log.Errorf("vulcand/oxy/buffer: request body over limit, err: %v", err)
s.errHandler.ServeHTTP(w, req, err)
b.errHandler.ServeHTTP(w, req, err)
return
}
@ -200,10 +212,10 @@ func (s *Buffer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// to read into memory and disk. This reader returns an error if the total request size exceeds the
// prefefined MaxSizeBytes. This can occur if we got chunked request, in this case ContentLength would be set to -1
// and the reader would be unbounded bufio in the http.Server
body, err := multibuf.New(req.Body, multibuf.MaxBytes(s.maxRequestBodyBytes), multibuf.MemBytes(s.memRequestBodyBytes))
body, err := multibuf.New(req.Body, multibuf.MaxBytes(b.maxRequestBodyBytes), multibuf.MemBytes(b.memRequestBodyBytes))
if err != nil || body == nil {
log.Errorf("vulcand/oxy/buffer: error when reading request body, err: %v", err)
s.errHandler.ServeHTTP(w, req, err)
b.errHandler.ServeHTTP(w, req, err)
return
}
@ -224,7 +236,7 @@ func (s *Buffer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
totalSize, err := body.Size()
if err != nil {
log.Errorf("vulcand/oxy/buffer: failed to get request size, err: %v", err)
s.errHandler.ServeHTTP(w, req, err)
b.errHandler.ServeHTTP(w, req, err)
return
}
@ -232,48 +244,48 @@ func (s *Buffer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
body = nil
}
outreq := s.copyRequest(req, body, totalSize)
outreq := b.copyRequest(req, body, totalSize)
attempt := 1
for {
// We create a special writer that will limit the response size, buffer it to disk if necessary
writer, err := multibuf.NewWriterOnce(multibuf.MaxBytes(s.maxResponseBodyBytes), multibuf.MemBytes(s.memResponseBodyBytes))
writer, err := multibuf.NewWriterOnce(multibuf.MaxBytes(b.maxResponseBodyBytes), multibuf.MemBytes(b.memResponseBodyBytes))
if err != nil {
log.Errorf("vulcand/oxy/buffer: failed create response writer, err: %v", err)
s.errHandler.ServeHTTP(w, req, err)
b.errHandler.ServeHTTP(w, req, err)
return
}
// We are mimicking http.ResponseWriter to replace writer with our special writer
b := &bufferWriter{
bw := &bufferWriter{
header: make(http.Header),
buffer: writer,
responseWriter: w,
}
defer b.Close()
defer bw.Close()
s.next.ServeHTTP(b, outreq)
if b.hijacked {
log.Infof("vulcand/oxy/buffer: connection was hijacked downstream. Not taking any action in buffer.")
b.next.ServeHTTP(bw, outreq)
if bw.hijacked {
log.Debugf("vulcand/oxy/buffer: connection was hijacked downstream. Not taking any action in buffer.")
return
}
var reader multibuf.MultiReader
if b.expectBody(outreq) {
if bw.expectBody(outreq) {
rdr, err := writer.Reader()
if err != nil {
log.Errorf("vulcand/oxy/buffer: failed to read response, err: %v", err)
s.errHandler.ServeHTTP(w, req, err)
b.errHandler.ServeHTTP(w, req, err)
return
}
defer rdr.Close()
reader = rdr
}
if (s.retryPredicate == nil || attempt > DefaultMaxRetryAttempts) ||
!s.retryPredicate(&context{r: req, attempt: attempt, responseCode: b.code}) {
utils.CopyHeaders(w.Header(), b.Header())
w.WriteHeader(b.code)
if (b.retryPredicate == nil || attempt > DefaultMaxRetryAttempts) ||
!b.retryPredicate(&context{r: req, attempt: attempt, responseCode: bw.code}) {
utils.CopyHeaders(w.Header(), bw.Header())
w.WriteHeader(bw.code)
if reader != nil {
io.Copy(w, reader)
}
@ -284,17 +296,17 @@ func (s *Buffer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if body != nil {
if _, err := body.Seek(0, 0); err != nil {
log.Errorf("vulcand/oxy/buffer: failed to rewind response body, err: %v", err)
s.errHandler.ServeHTTP(w, req, err)
b.errHandler.ServeHTTP(w, req, err)
return
}
}
outreq = s.copyRequest(req, body, totalSize)
log.Infof("vulcand/oxy/buffer: retry Request(%v %v) attempt %v", req.Method, req.URL, attempt)
outreq = b.copyRequest(req, body, totalSize)
log.Debugf("vulcand/oxy/buffer: retry Request(%v %v) attempt %v", req.Method, req.URL, attempt)
}
}
func (s *Buffer) copyRequest(req *http.Request, body io.ReadCloser, bodySize int64) *http.Request {
func (b *Buffer) copyRequest(req *http.Request, body io.ReadCloser, bodySize int64) *http.Request {
o := *req
o.URL = utils.CopyURL(req.URL)
o.Header = make(http.Header)
@ -311,12 +323,12 @@ func (s *Buffer) copyRequest(req *http.Request, body io.ReadCloser, bodySize int
return &o
}
func (s *Buffer) checkLimit(req *http.Request) error {
if s.maxRequestBodyBytes <= 0 {
func (b *Buffer) checkLimit(req *http.Request) error {
if b.maxRequestBodyBytes <= 0 {
return nil
}
if req.ContentLength > s.maxRequestBodyBytes {
return &multibuf.MaxSizeReachedError{MaxSize: s.maxRequestBodyBytes}
if req.ContentLength > b.maxRequestBodyBytes {
return &multibuf.MaxSizeReachedError{MaxSize: b.maxRequestBodyBytes}
}
return nil
}

View file

@ -126,7 +126,7 @@ func (c *CircuitBreaker) activateFallback(w http.ResponseWriter, req *http.Reque
c.m.Lock()
defer c.m.Unlock()
log.Infof("%v is in error state", c)
log.Warnf("%v is in error state", c)
switch c.state {
case stateStandby:
@ -197,7 +197,7 @@ func (c *CircuitBreaker) exec(s SideEffect) {
}
func (c *CircuitBreaker) setState(new cbState, until time.Time) {
log.Infof("%v setting state to %v, until %v", c, new, until)
log.Debugf("%v setting state to %v, until %v", c, new, until)
c.state = new
c.until = until
switch new {
@ -230,7 +230,7 @@ func (c *CircuitBreaker) checkAndSet() {
c.lastCheck = c.clock.UtcNow().Add(c.checkPeriod)
if c.state == stateTripped {
log.Infof("%v skip set tripped", c)
log.Debugf("%v skip set tripped", c)
return
}

View file

@ -73,6 +73,6 @@ func (w *WebhookSideEffect) Exec() error {
if err != nil {
return err
}
log.Infof("%v got response: (%s): %s", w, re.Status, string(body))
log.Debugf("%v got response: (%s): %s", w, re.Status, string(body))
return nil
}

View file

@ -34,17 +34,17 @@ func (r *ratioController) String() string {
}
func (r *ratioController) allowRequest() bool {
log.Infof("%v", r)
log.Debugf("%v", r)
t := r.targetRatio()
// This condition answers the question - would we satisfy the target ratio if we allow this request?
e := r.computeRatio(r.allowed+1, r.denied)
if e < t {
r.allowed++
log.Infof("%v allowed", r)
log.Debugf("%v allowed", r)
return true
}
r.denied++
log.Infof("%v denied", r)
log.Debugf("%v denied", r)
return false
}

View file

@ -58,7 +58,7 @@ func (cl *ConnLimiter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
if err := cl.acquire(token, amount); err != nil {
log.Infof("limiting request source %s: %v", token, err)
log.Debugf("limiting request source %s: %v", token, err)
cl.errHandler.ServeHTTP(w, r, err)
return
}

View file

@ -5,6 +5,7 @@ package forward
import (
"crypto/tls"
"errors"
"fmt"
"net/http"
"net/http/httptest"
@ -20,6 +21,20 @@ import (
"github.com/vulcand/oxy/utils"
)
// Oxy Logger interface of the internal
type OxyLogger interface {
log.FieldLogger
GetLevel() log.Level
}
type internalLogger struct {
*log.Logger
}
func (i *internalLogger) GetLevel() log.Level {
return i.Level
}
// ReqRewriter can alter request headers and body
type ReqRewriter interface {
Rewrite(r *http.Request)
@ -81,11 +96,20 @@ func Stream(stream bool) optSetter {
// Logger defines the logger the forwarder will use.
//
// It defaults to logrus.StandardLogger(), the global logger used by logrus.
func Logger(l *log.Logger) optSetter {
func Logger(l log.FieldLogger) optSetter {
return func(f *Forwarder) error {
f.log = l
if logger, ok := l.(OxyLogger); ok {
f.log = logger
return nil
}
if logger, ok := l.(*log.Logger); ok {
f.log = &internalLogger{Logger: logger}
return nil
}
return errors.New("the type of the logger must be OxyLogger or logrus.Logger")
}
}
func StateListener(stateListener UrlForwardingStateListener) optSetter {
@ -151,7 +175,7 @@ type httpForwarder struct {
tlsClientConfig *tls.Config
log *log.Logger
log OxyLogger
}
const (
@ -165,7 +189,7 @@ type UrlForwardingStateListener func(*url.URL, int)
// New creates an instance of Forwarder based on the provided list of configuration options
func New(setters ...optSetter) (*Forwarder, error) {
f := &Forwarder{
httpForwarder: &httpForwarder{log: log.StandardLogger()},
httpForwarder: &httpForwarder{log: &internalLogger{Logger: log.StandardLogger()}},
handlerContext: &handlerContext{},
}
for _, s := range setters {
@ -211,7 +235,7 @@ func New(setters ...optSetter) (*Forwarder, error) {
// ServeHTTP decides which forwarder to use based on the specified
// request and delegates to the proper implementation
func (f *Forwarder) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if f.log.Level >= log.DebugLevel {
if f.log.GetLevel() >= log.DebugLevel {
logEntry := f.log.WithField("Request", utils.DumpHttpRequest(req))
logEntry.Debug("vulcand/oxy/forward: begin ServeHttp on request")
defer logEntry.Debug("vulcand/oxy/forward: completed ServeHttp on request")
@ -273,7 +297,7 @@ func (f *httpForwarder) modifyRequest(outReq *http.Request, target *url.URL) {
// serveHTTP forwards websocket traffic
func (f *httpForwarder) serveWebSocket(w http.ResponseWriter, req *http.Request, ctx *handlerContext) {
if f.log.Level >= log.DebugLevel {
if f.log.GetLevel() >= log.DebugLevel {
logEntry := f.log.WithField("Request", utils.DumpHttpRequest(req))
logEntry.Debug("vulcand/oxy/forward/websocket: begin ServeHttp on request")
defer logEntry.Debug("vulcand/oxy/forward/websocket: competed ServeHttp on request")
@ -334,8 +358,8 @@ func (f *httpForwarder) serveWebSocket(w http.ResponseWriter, req *http.Request,
defer underlyingConn.Close()
defer targetConn.Close()
errClient := make(chan error)
errBackend := make(chan error)
errClient := make(chan error, 1)
errBackend := make(chan error, 1)
replicateWebsocketConn := func(dst, src *websocket.Conn, errc chan error) {
for {
msgType, msg, err := src.ReadMessage()
@ -414,7 +438,7 @@ func (f *httpForwarder) copyWebSocketRequest(req *http.Request) (outReq *http.Re
// serveHTTP forwards HTTP traffic using the configured transport
func (f *httpForwarder) serveHTTP(w http.ResponseWriter, inReq *http.Request, ctx *handlerContext) {
if f.log.Level >= log.DebugLevel {
if f.log.GetLevel() >= log.DebugLevel {
logEntry := f.log.WithField("Request", utils.DumpHttpRequest(inReq))
logEntry.Debug("vulcand/oxy/forward/http: begin ServeHttp on request")
defer logEntry.Debug("vulcand/oxy/forward/http: completed ServeHttp on request")
@ -439,14 +463,14 @@ func (f *httpForwarder) serveHTTP(w http.ResponseWriter, inReq *http.Request, ct
revproxy.ServeHTTP(pw, outReq)
if inReq.TLS != nil {
f.log.Infof("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v tls:version: %x, tls:resume:%t, tls:csuite:%x, tls:server:%v",
f.log.Debugf("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v tls:version: %x, tls:resume:%t, tls:csuite:%x, tls:server:%v",
inReq.URL, pw.Code, pw.Length, time.Now().UTC().Sub(start),
inReq.TLS.Version,
inReq.TLS.DidResume,
inReq.TLS.CipherSuite,
inReq.TLS.ServerName)
} else {
f.log.Infof("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v",
f.log.Debugf("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v",
inReq.URL, pw.Code, pw.Length, time.Now().UTC().Sub(start))
}
}

View file

@ -110,7 +110,7 @@ func (tl *TokenLimiter) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
if err := tl.consumeRates(req, source, amount); err != nil {
log.Infof("limiting request %v %v, limit: %v", req.Method, req.URL, err)
log.Warnf("limiting request %v %v, limit: %v", req.Method, req.URL, err)
tl.errHandler.ServeHTTP(w, req, err)
return
}

View file

@ -159,7 +159,7 @@ func (rb *Rebalancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
cookieUrl, present, err := rb.stickySession.GetBackend(&newReq, rb.Servers())
if err != nil {
log.Infof("vulcand/oxy/roundrobin/rebalancer: error using server from cookie: %v", err)
log.Warnf("vulcand/oxy/roundrobin/rebalancer: error using server from cookie: %v", err)
}
if present {
@ -319,7 +319,7 @@ func (rb *Rebalancer) adjustWeights() {
func (rb *Rebalancer) applyWeights() {
for _, srv := range rb.servers {
log.Infof("upsert server %v, weight %v", srv.url, srv.curWeight)
log.Debugf("upsert server %v, weight %v", srv.url, srv.curWeight)
rb.next.UpsertServer(srv.url, Weight(srv.curWeight))
}
}
@ -331,7 +331,7 @@ func (rb *Rebalancer) setMarkedWeights() bool {
if srv.good {
weight := increase(srv.curWeight)
if weight <= FSMMaxWeight {
log.Infof("increasing weight of %v from %v to %v", srv.url, srv.curWeight, weight)
log.Debugf("increasing weight of %v from %v to %v", srv.url, srv.curWeight, weight)
srv.curWeight = weight
changed = true
}
@ -378,7 +378,7 @@ func (rb *Rebalancer) markServers() bool {
}
}
if len(g) != 0 && len(b) != 0 {
log.Infof("bad: %v good: %v, ratings: %v", b, g, rb.ratings)
log.Debugf("bad: %v good: %v, ratings: %v", b, g, rb.ratings)
}
return len(g) != 0 && len(b) != 0
}
@ -392,7 +392,7 @@ func (rb *Rebalancer) convergeWeights() bool {
}
changed = true
newWeight := decrease(s.origWeight, s.curWeight)
log.Infof("decreasing weight of %v from %v to %v", s.url, s.curWeight, newWeight)
log.Debugf("decreasing weight of %v from %v to %v", s.url, s.curWeight, newWeight)
s.curWeight = newWeight
}
if !changed {

View file

@ -94,7 +94,7 @@ func (r *RoundRobin) ServeHTTP(w http.ResponseWriter, req *http.Request) {
cookieURL, present, err := r.stickySession.GetBackend(&newReq, r.Servers())
if err != nil {
log.Infof("vulcand/oxy/roundrobin/rr: error using server from cookie: %v", err)
log.Warnf("vulcand/oxy/roundrobin/rr: error using server from cookie: %v", err)
}
if present {