Refactor Exponential Backoff
This commit is contained in:
parent
0a5c9095ac
commit
83a7f10c75
1 changed files with 49 additions and 47 deletions
|
@ -16,6 +16,7 @@ import (
|
||||||
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
||||||
"github.com/traefik/traefik/v2/pkg/log"
|
"github.com/traefik/traefik/v2/pkg/log"
|
||||||
"github.com/traefik/traefik/v2/pkg/middlewares"
|
"github.com/traefik/traefik/v2/pkg/middlewares"
|
||||||
|
"github.com/traefik/traefik/v2/pkg/safe"
|
||||||
"github.com/traefik/traefik/v2/pkg/tracing"
|
"github.com/traefik/traefik/v2/pkg/tracing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -37,11 +38,6 @@ type Listener interface {
|
||||||
// each of them about a retry attempt.
|
// each of them about a retry attempt.
|
||||||
type Listeners []Listener
|
type Listeners []Listener
|
||||||
|
|
||||||
// nexter returns the duration to wait before retrying the operation.
|
|
||||||
type nexter interface {
|
|
||||||
NextBackOff() time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
// retry is a middleware that retries requests.
|
// retry is a middleware that retries requests.
|
||||||
type retry struct {
|
type retry struct {
|
||||||
attempts int
|
attempts int
|
||||||
|
@ -73,57 +69,63 @@ func (r *retry) GetTracingInformation() (string, ext.SpanKindEnum) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *retry) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
func (r *retry) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||||
// if we might make multiple attempts, swap the body for an io.NopCloser
|
if r.attempts == 1 {
|
||||||
// cf https://github.com/traefik/traefik/issues/1008
|
r.next.ServeHTTP(rw, req)
|
||||||
if r.attempts > 1 {
|
return
|
||||||
body := req.Body
|
|
||||||
defer body.Close()
|
|
||||||
req.Body = io.NopCloser(body)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
closableBody := req.Body
|
||||||
|
defer closableBody.Close()
|
||||||
|
|
||||||
|
// if we might make multiple attempts, swap the body for an io.NopCloser
|
||||||
|
// cf https://github.com/traefik/traefik/issues/1008
|
||||||
|
req.Body = io.NopCloser(closableBody)
|
||||||
|
|
||||||
attempts := 1
|
attempts := 1
|
||||||
backOff := r.newBackOff()
|
|
||||||
currentInterval := 0 * time.Millisecond
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-time.After(currentInterval):
|
|
||||||
|
|
||||||
shouldRetry := attempts < r.attempts
|
operation := func() error {
|
||||||
retryResponseWriter := newResponseWriter(rw, shouldRetry)
|
shouldRetry := attempts < r.attempts
|
||||||
|
retryResponseWriter := newResponseWriter(rw, shouldRetry)
|
||||||
|
|
||||||
// Disable retries when the backend already received request data
|
// Disable retries when the backend already received request data
|
||||||
trace := &httptrace.ClientTrace{
|
trace := &httptrace.ClientTrace{
|
||||||
WroteHeaders: func() {
|
WroteHeaders: func() {
|
||||||
retryResponseWriter.DisableRetries()
|
retryResponseWriter.DisableRetries()
|
||||||
},
|
},
|
||||||
WroteRequest: func(httptrace.WroteRequestInfo) {
|
WroteRequest: func(httptrace.WroteRequestInfo) {
|
||||||
retryResponseWriter.DisableRetries()
|
retryResponseWriter.DisableRetries()
|
||||||
},
|
},
|
||||||
}
|
|
||||||
newCtx := httptrace.WithClientTrace(req.Context(), trace)
|
|
||||||
|
|
||||||
r.next.ServeHTTP(retryResponseWriter, req.WithContext(newCtx))
|
|
||||||
|
|
||||||
if !retryResponseWriter.ShouldRetry() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
currentInterval = backOff.NextBackOff()
|
|
||||||
|
|
||||||
attempts++
|
|
||||||
|
|
||||||
log.FromContext(middlewares.GetLoggerCtx(req.Context(), r.name, typeName)).
|
|
||||||
Debugf("New attempt %d for request: %v", attempts, req.URL)
|
|
||||||
|
|
||||||
r.listener.Retried(req, attempts)
|
|
||||||
|
|
||||||
case <-req.Context().Done():
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
newCtx := httptrace.WithClientTrace(req.Context(), trace)
|
||||||
|
|
||||||
|
r.next.ServeHTTP(retryResponseWriter, req.WithContext(newCtx))
|
||||||
|
|
||||||
|
if !retryResponseWriter.ShouldRetry() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
attempts++
|
||||||
|
|
||||||
|
return fmt.Errorf("attempt %d failed", attempts-1)
|
||||||
|
}
|
||||||
|
|
||||||
|
backOff := backoff.WithContext(r.newBackOff(), req.Context())
|
||||||
|
|
||||||
|
notify := func(err error, d time.Duration) {
|
||||||
|
log.FromContext(middlewares.GetLoggerCtx(req.Context(), r.name, typeName)).
|
||||||
|
Debugf("New attempt %d for request: %v", attempts, req.URL)
|
||||||
|
|
||||||
|
r.listener.Retried(req, attempts)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := backoff.RetryNotify(safe.OperationWithRecover(operation), backOff, notify)
|
||||||
|
if err != nil {
|
||||||
|
log.FromContext(middlewares.GetLoggerCtx(req.Context(), r.name, typeName)).
|
||||||
|
Debugf("Final retry attempt failed: %v", err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *retry) newBackOff() nexter {
|
func (r *retry) newBackOff() backoff.BackOff {
|
||||||
if r.attempts < 2 || r.initialInterval <= 0 {
|
if r.attempts < 2 || r.initialInterval <= 0 {
|
||||||
return &backoff.ZeroBackOff{}
|
return &backoff.ZeroBackOff{}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue