From 60123a8f3ff380b4dfde89877c329464607980a2 Mon Sep 17 00:00:00 2001 From: Baptiste Mayelle Date: Tue, 2 Jan 2024 15:52:05 +0100 Subject: [PATCH] Hash WRR sticky cookies Co-authored-by: Romain --- pkg/server/service/loadbalancer/wrr/wrr.go | 55 +++++++++++-------- .../service/loadbalancer/wrr/wrr_test.go | 31 +++++++++++ 2 files changed, 64 insertions(+), 22 deletions(-) diff --git a/pkg/server/service/loadbalancer/wrr/wrr.go b/pkg/server/service/loadbalancer/wrr/wrr.go index d7261d95b..95c374842 100644 --- a/pkg/server/service/loadbalancer/wrr/wrr.go +++ b/pkg/server/service/loadbalancer/wrr/wrr.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "hash/fnv" "net/http" "sync" @@ -34,7 +35,9 @@ type Balancer struct { stickyCookie *stickyCookie wantsHealthCheck bool - mutex sync.RWMutex + handlersMu sync.RWMutex + // References all the handlers by name and also by the hashed value of the name. + handlerMap map[string]*namedHandler handlers []*namedHandler curDeadline float64 // status is a record of which child services of the Balancer are healthy, keyed @@ -51,6 +54,7 @@ type Balancer struct { func New(sticky *dynamic.Sticky, hc *dynamic.HealthCheck) *Balancer { balancer := &Balancer{ status: make(map[string]struct{}), + handlerMap: make(map[string]*namedHandler), wantsHealthCheck: hc != nil, } if sticky != nil && sticky.Cookie != nil { @@ -60,6 +64,7 @@ func New(sticky *dynamic.Sticky, hc *dynamic.HealthCheck) *Balancer { httpOnly: sticky.Cookie.HTTPOnly, } } + return balancer } @@ -97,8 +102,8 @@ func (b *Balancer) Pop() interface{} { // SetStatus sets on the balancer that its given child is now of the given // status. balancerName is only needed for logging purposes. func (b *Balancer) SetStatus(ctx context.Context, childName string, up bool) { - b.mutex.Lock() - defer b.mutex.Unlock() + b.handlersMu.Lock() + defer b.handlersMu.Unlock() upBefore := len(b.status) > 0 @@ -147,8 +152,8 @@ func (b *Balancer) RegisterStatusUpdater(fn func(up bool)) error { var errNoAvailableServer = errors.New("no available server") func (b *Balancer) nextServer() (*namedHandler, error) { - b.mutex.Lock() - defer b.mutex.Unlock() + b.handlersMu.Lock() + defer b.handlersMu.Unlock() if len(b.handlers) == 0 { return nil, fmt.Errorf("no servers in the pool") @@ -185,22 +190,18 @@ func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) { } if err == nil && cookie != nil { - for _, handler := range b.handlers { - if handler.name != cookie.Value { - continue - } + b.handlersMu.RLock() + handler, ok := b.handlerMap[cookie.Value] + b.handlersMu.RUnlock() - b.mutex.RLock() - _, ok := b.status[handler.name] - b.mutex.RUnlock() - if !ok { - // because we already are in the only iteration that matches the cookie, so none - // of the following iterations are going to be a match for the cookie anyway. - break + if ok && handler != nil { + b.handlersMu.RLock() + _, isHealthy := b.status[handler.name] + b.handlersMu.RUnlock() + if isHealthy { + handler.ServeHTTP(w, req) + return } - - handler.ServeHTTP(w, req) - return } } } @@ -216,7 +217,7 @@ func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) { } if b.stickyCookie != nil { - cookie := &http.Cookie{Name: b.stickyCookie.name, Value: server.name, Path: "/", HttpOnly: b.stickyCookie.httpOnly, Secure: b.stickyCookie.secure} + cookie := &http.Cookie{Name: b.stickyCookie.name, Value: hash(server.name), Path: "/", HttpOnly: b.stickyCookie.httpOnly, Secure: b.stickyCookie.secure} http.SetCookie(w, cookie) } @@ -237,9 +238,19 @@ func (b *Balancer) AddService(name string, handler http.Handler, weight *int) { h := &namedHandler{Handler: handler, name: name, weight: float64(w)} - b.mutex.Lock() + b.handlersMu.Lock() h.deadline = b.curDeadline + 1/h.weight heap.Push(b, h) b.status[name] = struct{}{} - b.mutex.Unlock() + b.handlerMap[name] = h + b.handlerMap[hash(name)] = h + b.handlersMu.Unlock() +} + +func hash(input string) string { + hasher := fnv.New64() + // We purposely ignore the error because the implementation always returns nil. + _, _ = hasher.Write([]byte(input)) + + return fmt.Sprintf("%x", hasher.Sum64()) } diff --git a/pkg/server/service/loadbalancer/wrr/wrr_test.go b/pkg/server/service/loadbalancer/wrr/wrr_test.go index ac5f7e15b..7328fd98d 100644 --- a/pkg/server/service/loadbalancer/wrr/wrr_test.go +++ b/pkg/server/service/loadbalancer/wrr/wrr_test.go @@ -254,6 +254,8 @@ func TestSticky(t *testing.T) { req := httptest.NewRequest(http.MethodGet, "/", nil) for i := 0; i < 3; i++ { for _, cookie := range recorder.Result().Cookies() { + assert.NotContains(t, "test=first", cookie.Value) + assert.NotContains(t, "test=second", cookie.Value) req.AddCookie(cookie) } recorder.ResponseRecorder = httptest.NewRecorder() @@ -265,6 +267,35 @@ func TestSticky(t *testing.T) { assert.Equal(t, 3, recorder.save["second"]) } +func TestSticky_FallBack(t *testing.T) { + balancer := New(&dynamic.Sticky{ + Cookie: &dynamic.Cookie{Name: "test"}, + }, nil) + + balancer.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "first") + rw.WriteHeader(http.StatusOK) + }), Int(1)) + + balancer.AddService("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "second") + rw.WriteHeader(http.StatusOK) + }), Int(2)) + + recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + + req := httptest.NewRequest(http.MethodGet, "/", nil) + req.AddCookie(&http.Cookie{Name: "test", Value: "second"}) + for i := 0; i < 3; i++ { + recorder.ResponseRecorder = httptest.NewRecorder() + + balancer.ServeHTTP(recorder, req) + } + + assert.Equal(t, 0, recorder.save["first"]) + assert.Equal(t, 3, recorder.save["second"]) +} + // TestBalancerBias makes sure that the WRR algorithm spreads elements evenly right from the start, // and that it does not "over-favor" the high-weighted ones with a biased start-up regime. func TestBalancerBias(t *testing.T) {