2019-08-26 12:20:06 +02:00
|
|
|
// Package ratelimiter implements a rate limiting and traffic shaping middleware with a set of token buckets.
|
2018-11-14 10:18:03 +01:00
|
|
|
package ratelimiter
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2019-08-26 12:20:06 +02:00
|
|
|
"fmt"
|
2018-11-14 10:18:03 +01:00
|
|
|
"net/http"
|
|
|
|
"time"
|
|
|
|
|
2019-08-26 12:20:06 +02:00
|
|
|
"github.com/mailgun/ttlmap"
|
2018-11-14 10:18:03 +01:00
|
|
|
"github.com/opentracing/opentracing-go/ext"
|
2020-09-16 15:46:04 +02:00
|
|
|
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
|
|
|
"github.com/traefik/traefik/v2/pkg/log"
|
|
|
|
"github.com/traefik/traefik/v2/pkg/middlewares"
|
|
|
|
"github.com/traefik/traefik/v2/pkg/tracing"
|
2018-11-14 10:18:03 +01:00
|
|
|
"github.com/vulcand/oxy/utils"
|
2019-08-26 12:20:06 +02:00
|
|
|
"golang.org/x/time/rate"
|
2018-11-14 10:18:03 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2019-08-26 12:20:06 +02:00
|
|
|
typeName = "RateLimiterType"
|
|
|
|
maxSources = 65536
|
2018-11-14 10:18:03 +01:00
|
|
|
)
|
|
|
|
|
2019-08-26 12:20:06 +02:00
|
|
|
// rateLimiter implements rate limiting and traffic shaping with a set of token buckets;
|
|
|
|
// one for each traffic source. The same parameters are applied to all the buckets.
|
2018-11-14 10:18:03 +01:00
|
|
|
type rateLimiter struct {
|
2019-08-26 12:20:06 +02:00
|
|
|
name string
|
|
|
|
rate rate.Limit // reqs/s
|
|
|
|
burst int64
|
|
|
|
// maxDelay is the maximum duration we're willing to wait for a bucket reservation to become effective, in nanoseconds.
|
2020-01-08 11:44:04 +01:00
|
|
|
// For now it is somewhat arbitrarily set to 1/(2*rate).
|
2019-08-26 12:20:06 +02:00
|
|
|
maxDelay time.Duration
|
|
|
|
sourceMatcher utils.SourceExtractor
|
|
|
|
next http.Handler
|
|
|
|
|
2019-11-12 11:06:05 +01:00
|
|
|
buckets *ttlmap.TtlMap // actual buckets, keyed by source.
|
2018-11-14 10:18:03 +01:00
|
|
|
}
|
|
|
|
|
2019-08-26 12:20:06 +02:00
|
|
|
// New returns a rate limiter middleware.
|
2019-07-10 09:26:04 +02:00
|
|
|
func New(ctx context.Context, next http.Handler, config dynamic.RateLimit, name string) (http.Handler, error) {
|
2019-08-26 12:20:06 +02:00
|
|
|
ctxLog := log.With(ctx, log.Str(log.MiddlewareName, name), log.Str(log.MiddlewareType, typeName))
|
|
|
|
log.FromContext(ctxLog).Debug("Creating middleware")
|
2018-11-14 10:18:03 +01:00
|
|
|
|
2019-08-26 12:20:06 +02:00
|
|
|
if config.SourceCriterion == nil ||
|
|
|
|
config.SourceCriterion.IPStrategy == nil &&
|
|
|
|
config.SourceCriterion.RequestHeaderName == "" && !config.SourceCriterion.RequestHost {
|
|
|
|
config.SourceCriterion = &dynamic.SourceCriterion{
|
|
|
|
IPStrategy: &dynamic.IPStrategy{},
|
|
|
|
}
|
2018-11-14 10:18:03 +01:00
|
|
|
}
|
|
|
|
|
2019-08-26 12:20:06 +02:00
|
|
|
sourceMatcher, err := middlewares.GetSourceExtractor(ctxLog, config.SourceCriterion)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2018-11-14 10:18:03 +01:00
|
|
|
}
|
|
|
|
|
2019-11-12 11:06:05 +01:00
|
|
|
buckets, err := ttlmap.NewConcurrent(maxSources)
|
2018-11-14 10:18:03 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2019-08-26 12:20:06 +02:00
|
|
|
|
|
|
|
burst := config.Burst
|
2020-01-08 11:44:04 +01:00
|
|
|
if burst < 1 {
|
2019-08-26 12:20:06 +02:00
|
|
|
burst = 1
|
|
|
|
}
|
|
|
|
|
2020-01-08 11:44:04 +01:00
|
|
|
period := time.Duration(config.Period)
|
|
|
|
if period == 0 {
|
|
|
|
period = time.Second
|
|
|
|
}
|
|
|
|
|
|
|
|
// Logically, we should set maxDelay to infinity when config.Average == 0 (because it means no rate limiting),
|
2019-08-26 12:20:06 +02:00
|
|
|
// but since the reservation will give us a delay = 0 anyway in this case, we're good even with any maxDelay >= 0.
|
|
|
|
var maxDelay time.Duration
|
2020-01-08 11:44:04 +01:00
|
|
|
var rtl float64
|
|
|
|
if config.Average > 0 {
|
|
|
|
rtl = float64(config.Average*int64(time.Second)) / float64(period)
|
|
|
|
// maxDelay does not scale well for rates below 1,
|
|
|
|
// so we just cap it to the corresponding value, i.e. 0.5s, in order to keep the effective rate predictable.
|
|
|
|
// One alternative would be to switch to a no-reservation mode (Allow() method) whenever we are in such a low rate regime.
|
|
|
|
if rtl < 1 {
|
|
|
|
maxDelay = 500 * time.Millisecond
|
|
|
|
} else {
|
|
|
|
maxDelay = time.Second / (time.Duration(rtl) * 2)
|
|
|
|
}
|
2019-08-26 12:20:06 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
return &rateLimiter{
|
|
|
|
name: name,
|
2020-01-08 11:44:04 +01:00
|
|
|
rate: rate.Limit(rtl),
|
2019-08-26 12:20:06 +02:00
|
|
|
burst: burst,
|
|
|
|
maxDelay: maxDelay,
|
|
|
|
next: next,
|
|
|
|
sourceMatcher: sourceMatcher,
|
|
|
|
buckets: buckets,
|
|
|
|
}, nil
|
2018-11-14 10:18:03 +01:00
|
|
|
}
|
|
|
|
|
2019-08-26 12:20:06 +02:00
|
|
|
func (rl *rateLimiter) GetTracingInformation() (string, ext.SpanKindEnum) {
|
|
|
|
return rl.name, tracing.SpanKindNoneEnum
|
|
|
|
}
|
|
|
|
|
|
|
|
func (rl *rateLimiter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
2019-09-13 19:28:04 +02:00
|
|
|
ctx := middlewares.GetLoggerCtx(r.Context(), rl.name, typeName)
|
|
|
|
logger := log.FromContext(ctx)
|
2019-08-26 12:20:06 +02:00
|
|
|
|
|
|
|
source, amount, err := rl.sourceMatcher.Extract(r)
|
|
|
|
if err != nil {
|
|
|
|
logger.Errorf("could not extract source of request: %v", err)
|
|
|
|
http.Error(w, "could not extract source of request", http.StatusInternalServerError)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if amount != 1 {
|
|
|
|
logger.Infof("ignoring token bucket amount > 1: %d", amount)
|
|
|
|
}
|
|
|
|
|
|
|
|
var bucket *rate.Limiter
|
|
|
|
if rlSource, exists := rl.buckets.Get(source); exists {
|
|
|
|
bucket = rlSource.(*rate.Limiter)
|
|
|
|
} else {
|
|
|
|
bucket = rate.NewLimiter(rl.rate, int(rl.burst))
|
|
|
|
if err := rl.buckets.Set(source, bucket, int(rl.maxDelay)*10+1); err != nil {
|
|
|
|
logger.Errorf("could not insert bucket: %v", err)
|
|
|
|
http.Error(w, "could not insert bucket", http.StatusInternalServerError)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
res := bucket.Reserve()
|
|
|
|
if !res.OK() {
|
|
|
|
http.Error(w, "No bursty traffic allowed", http.StatusTooManyRequests)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
delay := res.Delay()
|
|
|
|
if delay > rl.maxDelay {
|
|
|
|
res.Cancel()
|
2019-09-13 19:28:04 +02:00
|
|
|
rl.serveDelayError(ctx, w, r, delay)
|
2019-08-26 12:20:06 +02:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
time.Sleep(delay)
|
|
|
|
rl.next.ServeHTTP(w, r)
|
2018-11-14 10:18:03 +01:00
|
|
|
}
|
|
|
|
|
2019-09-13 19:28:04 +02:00
|
|
|
func (rl *rateLimiter) serveDelayError(ctx context.Context, w http.ResponseWriter, r *http.Request, delay time.Duration) {
|
2019-08-26 12:20:06 +02:00
|
|
|
w.Header().Set("Retry-After", fmt.Sprintf("%.0f", delay.Seconds()))
|
|
|
|
w.Header().Set("X-Retry-In", delay.String())
|
|
|
|
w.WriteHeader(http.StatusTooManyRequests)
|
|
|
|
|
|
|
|
if _, err := w.Write([]byte(http.StatusText(http.StatusTooManyRequests))); err != nil {
|
2019-09-13 19:28:04 +02:00
|
|
|
log.FromContext(ctx).Errorf("could not serve 429: %v", err)
|
2019-08-26 12:20:06 +02:00
|
|
|
}
|
2018-11-14 10:18:03 +01:00
|
|
|
}
|