ratelimiter: use correct ttlSeconds value, and always call Set
Co-authored-by: Romain <rtribotte@users.noreply.github.com> Co-authored-by: Daniel Tomcej <daniel.tomcej@gmail.com>
This commit is contained in:
parent
e12630ef06
commit
d9fc775084
1 changed files with 36 additions and 8 deletions
|
@ -30,7 +30,12 @@ type rateLimiter struct {
|
|||
burst int64
|
||||
// maxDelay is the maximum duration we're willing to wait for a bucket reservation to become effective, in nanoseconds.
|
||||
// For now it is somewhat arbitrarily set to 1/(2*rate).
|
||||
maxDelay time.Duration
|
||||
maxDelay time.Duration
|
||||
// each rate limiter for a given source is stored in the buckets ttlmap.
|
||||
// To keep this ttlmap constrained in size,
|
||||
// each ratelimiter is "garbage collected" when it is considered expired.
|
||||
// It is considered expired after it hasn't been used for ttl seconds.
|
||||
ttl int
|
||||
sourceMatcher utils.SourceExtractor
|
||||
next http.Handler
|
||||
|
||||
|
@ -66,12 +71,15 @@ func New(ctx context.Context, next http.Handler, config dynamic.RateLimit, name
|
|||
}
|
||||
|
||||
period := time.Duration(config.Period)
|
||||
if period < 0 {
|
||||
return nil, fmt.Errorf("negative value not valid for period: %v", period)
|
||||
}
|
||||
if period == 0 {
|
||||
period = time.Second
|
||||
}
|
||||
|
||||
// Logically, we should set maxDelay to infinity when config.Average == 0 (because it means no rate limiting),
|
||||
// but since the reservation will give us a delay = 0 anyway in this case, we're good even with any maxDelay >= 0.
|
||||
// if config.Average == 0, in that case,
|
||||
// the value of maxDelay does not matter since the reservation will (buggily) give us a delay of 0 anyway.
|
||||
var maxDelay time.Duration
|
||||
var rtl float64
|
||||
if config.Average > 0 {
|
||||
|
@ -86,6 +94,17 @@ func New(ctx context.Context, next http.Handler, config dynamic.RateLimit, name
|
|||
}
|
||||
}
|
||||
|
||||
// Make the ttl inversely proportional to how often a rate limiter is supposed to see any activity (when maxed out),
|
||||
// for low rate limiters.
|
||||
// Otherwise just make it a second for all the high rate limiters.
|
||||
// Add an extra second in both cases for continuity between the two cases.
|
||||
ttl := 1
|
||||
if rtl >= 1 {
|
||||
ttl++
|
||||
} else if rtl > 0 {
|
||||
ttl += int(1 / rtl)
|
||||
}
|
||||
|
||||
return &rateLimiter{
|
||||
name: name,
|
||||
rate: rate.Limit(rtl),
|
||||
|
@ -94,6 +113,7 @@ func New(ctx context.Context, next http.Handler, config dynamic.RateLimit, name
|
|||
next: next,
|
||||
sourceMatcher: sourceMatcher,
|
||||
buckets: buckets,
|
||||
ttl: ttl,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -121,13 +141,21 @@ func (rl *rateLimiter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
bucket = rlSource.(*rate.Limiter)
|
||||
} else {
|
||||
bucket = rate.NewLimiter(rl.rate, int(rl.burst))
|
||||
if err := rl.buckets.Set(source, bucket, int(rl.maxDelay)*10+1); err != nil {
|
||||
logger.Errorf("could not insert bucket: %v", err)
|
||||
http.Error(w, "could not insert bucket", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// We Set even in the case where the source already exists,
|
||||
// because we want to update the expiryTime everytime we get the source,
|
||||
// as the expiryTime is supposed to reflect the activity (or lack thereof) on that source.
|
||||
if err := rl.buckets.Set(source, bucket, rl.ttl); err != nil {
|
||||
logger.Errorf("could not insert/update bucket: %v", err)
|
||||
http.Error(w, "could not insert/update bucket", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// time/rate is bugged, since a rate.Limiter with a 0 Limit not only allows a Reservation to take place,
|
||||
// but also gives a 0 delay below (because of a division by zero, followed by a multiplication that flips into the negatives),
|
||||
// regardless of the current load.
|
||||
// However, for now we take advantage of this behavior to provide the no-limit ratelimiter when config.Average is 0.
|
||||
res := bucket.Reserve()
|
||||
if !res.OK() {
|
||||
http.Error(w, "No bursty traffic allowed", http.StatusTooManyRequests)
|
||||
|
|
Loading…
Reference in a new issue