traefik/pkg/middlewares/ratelimiter/rate_limiter.go

188 lines
5.8 KiB
Go
Raw Normal View History

// Package ratelimiter implements a rate limiting and traffic shaping middleware with a set of token buckets.
2018-11-14 09:18:03 +00:00
package ratelimiter
import (
"context"
"fmt"
"math"
2018-11-14 09:18:03 +00:00
"net/http"
"time"
"github.com/mailgun/ttlmap"
2022-11-21 17:36:05 +00:00
"github.com/rs/zerolog/log"
2023-02-03 14:24:05 +00:00
"github.com/traefik/traefik/v3/pkg/config/dynamic"
"github.com/traefik/traefik/v3/pkg/middlewares"
"github.com/traefik/traefik/v3/pkg/tracing"
2022-11-21 17:36:05 +00:00
"github.com/vulcand/oxy/v2/utils"
2024-01-08 08:10:06 +00:00
"go.opentelemetry.io/otel/trace"
"golang.org/x/time/rate"
2018-11-14 09:18:03 +00:00
)
const (
2024-01-08 08:10:06 +00:00
typeName = "RateLimiter"
maxSources = 65536
2018-11-14 09:18:03 +00:00
)
// rateLimiter implements rate limiting and traffic shaping with a set of token buckets;
// one for each traffic source. The same parameters are applied to all the buckets.
2018-11-14 09:18:03 +00:00
type rateLimiter struct {
name string
rate rate.Limit // reqs/s
burst int64
// maxDelay is the maximum duration we're willing to wait for a bucket reservation to become effective, in nanoseconds.
2020-01-08 10:44:04 +00:00
// For now it is somewhat arbitrarily set to 1/(2*rate).
maxDelay time.Duration
// each rate limiter for a given source is stored in the buckets ttlmap.
// To keep this ttlmap constrained in size,
// each ratelimiter is "garbage collected" when it is considered expired.
// It is considered expired after it hasn't been used for ttl seconds.
ttl int
sourceMatcher utils.SourceExtractor
next http.Handler
2019-11-12 10:06:05 +00:00
buckets *ttlmap.TtlMap // actual buckets, keyed by source.
2018-11-14 09:18:03 +00:00
}
// New returns a rate limiter middleware.
func New(ctx context.Context, next http.Handler, config dynamic.RateLimit, name string) (http.Handler, error) {
2022-11-21 17:36:05 +00:00
logger := middlewares.GetLogger(ctx, name, typeName)
logger.Debug().Msg("Creating middleware")
ctxLog := logger.WithContext(ctx)
2018-11-14 09:18:03 +00:00
if config.SourceCriterion == nil ||
config.SourceCriterion.IPStrategy == nil &&
config.SourceCriterion.RequestHeaderName == "" && !config.SourceCriterion.RequestHost {
config.SourceCriterion = &dynamic.SourceCriterion{
IPStrategy: &dynamic.IPStrategy{},
}
2018-11-14 09:18:03 +00:00
}
sourceMatcher, err := middlewares.GetSourceExtractor(ctxLog, config.SourceCriterion)
if err != nil {
return nil, err
2018-11-14 09:18:03 +00:00
}
2019-11-12 10:06:05 +00:00
buckets, err := ttlmap.NewConcurrent(maxSources)
2018-11-14 09:18:03 +00:00
if err != nil {
return nil, err
}
burst := config.Burst
2020-01-08 10:44:04 +00:00
if burst < 1 {
burst = 1
}
2020-01-08 10:44:04 +00:00
period := time.Duration(config.Period)
if period < 0 {
return nil, fmt.Errorf("negative value not valid for period: %v", period)
}
2020-01-08 10:44:04 +00:00
if period == 0 {
period = time.Second
}
2023-01-03 15:16:05 +00:00
// Initialized at rate.Inf to enforce no rate limiting when config.Average == 0
rtl := float64(rate.Inf)
// No need to set any particular value for maxDelay as the reservation's delay
// will be <= 0 in the Inf case (i.e. the average == 0 case).
var maxDelay time.Duration
2023-01-03 15:16:05 +00:00
2020-01-08 10:44:04 +00:00
if config.Average > 0 {
rtl = float64(config.Average*int64(time.Second)) / float64(period)
// maxDelay does not scale well for rates below 1,
// so we just cap it to the corresponding value, i.e. 0.5s, in order to keep the effective rate predictable.
// One alternative would be to switch to a no-reservation mode (Allow() method) whenever we are in such a low rate regime.
if rtl < 1 {
maxDelay = 500 * time.Millisecond
} else {
maxDelay = time.Second / (time.Duration(rtl) * 2)
}
}
// Make the ttl inversely proportional to how often a rate limiter is supposed to see any activity (when maxed out),
// for low rate limiters.
// Otherwise just make it a second for all the high rate limiters.
// Add an extra second in both cases for continuity between the two cases.
ttl := 1
if rtl >= 1 {
ttl++
} else if rtl > 0 {
ttl += int(1 / rtl)
}
return &rateLimiter{
name: name,
2020-01-08 10:44:04 +00:00
rate: rate.Limit(rtl),
burst: burst,
maxDelay: maxDelay,
next: next,
sourceMatcher: sourceMatcher,
buckets: buckets,
ttl: ttl,
}, nil
2018-11-14 09:18:03 +00:00
}
2024-01-08 08:10:06 +00:00
func (rl *rateLimiter) GetTracingInformation() (string, string, trace.SpanKind) {
return rl.name, typeName, trace.SpanKindInternal
}
2022-11-21 17:36:05 +00:00
func (rl *rateLimiter) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
logger := middlewares.GetLogger(req.Context(), rl.name, typeName)
ctx := logger.WithContext(req.Context())
2022-11-21 17:36:05 +00:00
source, amount, err := rl.sourceMatcher.Extract(req)
if err != nil {
2022-11-21 17:36:05 +00:00
logger.Error().Err(err).Msg("Could not extract source of request")
http.Error(rw, "could not extract source of request", http.StatusInternalServerError)
return
}
if amount != 1 {
2022-11-21 17:36:05 +00:00
logger.Info().Msgf("ignoring token bucket amount > 1: %d", amount)
}
var bucket *rate.Limiter
if rlSource, exists := rl.buckets.Get(source); exists {
bucket = rlSource.(*rate.Limiter)
} else {
bucket = rate.NewLimiter(rl.rate, int(rl.burst))
}
// We Set even in the case where the source already exists,
// because we want to update the expiryTime everytime we get the source,
// as the expiryTime is supposed to reflect the activity (or lack thereof) on that source.
if err := rl.buckets.Set(source, bucket, rl.ttl); err != nil {
2022-11-21 17:36:05 +00:00
logger.Error().Err(err).Msg("Could not insert/update bucket")
2024-01-08 08:10:06 +00:00
tracing.SetStatusErrorf(req.Context(), "Could not insert/update bucket")
2022-11-21 17:36:05 +00:00
http.Error(rw, "could not insert/update bucket", http.StatusInternalServerError)
return
}
res := bucket.Reserve()
if !res.OK() {
2024-01-08 08:10:06 +00:00
tracing.SetStatusErrorf(req.Context(), "No bursty traffic allowed")
2022-11-21 17:36:05 +00:00
http.Error(rw, "No bursty traffic allowed", http.StatusTooManyRequests)
return
}
delay := res.Delay()
if delay > rl.maxDelay {
res.Cancel()
2022-11-21 17:36:05 +00:00
rl.serveDelayError(ctx, rw, delay)
return
}
time.Sleep(delay)
2022-11-21 17:36:05 +00:00
rl.next.ServeHTTP(rw, req)
2018-11-14 09:18:03 +00:00
}
2022-08-31 06:24:08 +00:00
func (rl *rateLimiter) serveDelayError(ctx context.Context, w http.ResponseWriter, delay time.Duration) {
w.Header().Set("Retry-After", fmt.Sprintf("%.0f", math.Ceil(delay.Seconds())))
w.Header().Set("X-Retry-In", delay.String())
w.WriteHeader(http.StatusTooManyRequests)
if _, err := w.Write([]byte(http.StatusText(http.StatusTooManyRequests))); err != nil {
2022-11-21 17:36:05 +00:00
log.Ctx(ctx).Error().Err(err).Msg("Could not serve 429")
}
2018-11-14 09:18:03 +00:00
}