Hash WRR sticky cookies
Co-authored-by: Romain <rtribotte@users.noreply.github.com>
This commit is contained in:
parent
2a7b2ef772
commit
60123a8f3f
2 changed files with 64 additions and 22 deletions
|
@ -5,6 +5,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"hash/fnv"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
@ -34,7 +35,9 @@ type Balancer struct {
|
||||||
stickyCookie *stickyCookie
|
stickyCookie *stickyCookie
|
||||||
wantsHealthCheck bool
|
wantsHealthCheck bool
|
||||||
|
|
||||||
mutex sync.RWMutex
|
handlersMu sync.RWMutex
|
||||||
|
// References all the handlers by name and also by the hashed value of the name.
|
||||||
|
handlerMap map[string]*namedHandler
|
||||||
handlers []*namedHandler
|
handlers []*namedHandler
|
||||||
curDeadline float64
|
curDeadline float64
|
||||||
// status is a record of which child services of the Balancer are healthy, keyed
|
// status is a record of which child services of the Balancer are healthy, keyed
|
||||||
|
@ -51,6 +54,7 @@ type Balancer struct {
|
||||||
func New(sticky *dynamic.Sticky, hc *dynamic.HealthCheck) *Balancer {
|
func New(sticky *dynamic.Sticky, hc *dynamic.HealthCheck) *Balancer {
|
||||||
balancer := &Balancer{
|
balancer := &Balancer{
|
||||||
status: make(map[string]struct{}),
|
status: make(map[string]struct{}),
|
||||||
|
handlerMap: make(map[string]*namedHandler),
|
||||||
wantsHealthCheck: hc != nil,
|
wantsHealthCheck: hc != nil,
|
||||||
}
|
}
|
||||||
if sticky != nil && sticky.Cookie != nil {
|
if sticky != nil && sticky.Cookie != nil {
|
||||||
|
@ -60,6 +64,7 @@ func New(sticky *dynamic.Sticky, hc *dynamic.HealthCheck) *Balancer {
|
||||||
httpOnly: sticky.Cookie.HTTPOnly,
|
httpOnly: sticky.Cookie.HTTPOnly,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return balancer
|
return balancer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,8 +102,8 @@ func (b *Balancer) Pop() interface{} {
|
||||||
// SetStatus sets on the balancer that its given child is now of the given
|
// SetStatus sets on the balancer that its given child is now of the given
|
||||||
// status. balancerName is only needed for logging purposes.
|
// status. balancerName is only needed for logging purposes.
|
||||||
func (b *Balancer) SetStatus(ctx context.Context, childName string, up bool) {
|
func (b *Balancer) SetStatus(ctx context.Context, childName string, up bool) {
|
||||||
b.mutex.Lock()
|
b.handlersMu.Lock()
|
||||||
defer b.mutex.Unlock()
|
defer b.handlersMu.Unlock()
|
||||||
|
|
||||||
upBefore := len(b.status) > 0
|
upBefore := len(b.status) > 0
|
||||||
|
|
||||||
|
@ -147,8 +152,8 @@ func (b *Balancer) RegisterStatusUpdater(fn func(up bool)) error {
|
||||||
var errNoAvailableServer = errors.New("no available server")
|
var errNoAvailableServer = errors.New("no available server")
|
||||||
|
|
||||||
func (b *Balancer) nextServer() (*namedHandler, error) {
|
func (b *Balancer) nextServer() (*namedHandler, error) {
|
||||||
b.mutex.Lock()
|
b.handlersMu.Lock()
|
||||||
defer b.mutex.Unlock()
|
defer b.handlersMu.Unlock()
|
||||||
|
|
||||||
if len(b.handlers) == 0 {
|
if len(b.handlers) == 0 {
|
||||||
return nil, fmt.Errorf("no servers in the pool")
|
return nil, fmt.Errorf("no servers in the pool")
|
||||||
|
@ -185,25 +190,21 @@ func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err == nil && cookie != nil {
|
if err == nil && cookie != nil {
|
||||||
for _, handler := range b.handlers {
|
b.handlersMu.RLock()
|
||||||
if handler.name != cookie.Value {
|
handler, ok := b.handlerMap[cookie.Value]
|
||||||
continue
|
b.handlersMu.RUnlock()
|
||||||
}
|
|
||||||
|
|
||||||
b.mutex.RLock()
|
|
||||||
_, ok := b.status[handler.name]
|
|
||||||
b.mutex.RUnlock()
|
|
||||||
if !ok {
|
|
||||||
// because we already are in the only iteration that matches the cookie, so none
|
|
||||||
// of the following iterations are going to be a match for the cookie anyway.
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if ok && handler != nil {
|
||||||
|
b.handlersMu.RLock()
|
||||||
|
_, isHealthy := b.status[handler.name]
|
||||||
|
b.handlersMu.RUnlock()
|
||||||
|
if isHealthy {
|
||||||
handler.ServeHTTP(w, req)
|
handler.ServeHTTP(w, req)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
server, err := b.nextServer()
|
server, err := b.nextServer()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -216,7 +217,7 @@ func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.stickyCookie != nil {
|
if b.stickyCookie != nil {
|
||||||
cookie := &http.Cookie{Name: b.stickyCookie.name, Value: server.name, Path: "/", HttpOnly: b.stickyCookie.httpOnly, Secure: b.stickyCookie.secure}
|
cookie := &http.Cookie{Name: b.stickyCookie.name, Value: hash(server.name), Path: "/", HttpOnly: b.stickyCookie.httpOnly, Secure: b.stickyCookie.secure}
|
||||||
http.SetCookie(w, cookie)
|
http.SetCookie(w, cookie)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,9 +238,19 @@ func (b *Balancer) AddService(name string, handler http.Handler, weight *int) {
|
||||||
|
|
||||||
h := &namedHandler{Handler: handler, name: name, weight: float64(w)}
|
h := &namedHandler{Handler: handler, name: name, weight: float64(w)}
|
||||||
|
|
||||||
b.mutex.Lock()
|
b.handlersMu.Lock()
|
||||||
h.deadline = b.curDeadline + 1/h.weight
|
h.deadline = b.curDeadline + 1/h.weight
|
||||||
heap.Push(b, h)
|
heap.Push(b, h)
|
||||||
b.status[name] = struct{}{}
|
b.status[name] = struct{}{}
|
||||||
b.mutex.Unlock()
|
b.handlerMap[name] = h
|
||||||
|
b.handlerMap[hash(name)] = h
|
||||||
|
b.handlersMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func hash(input string) string {
|
||||||
|
hasher := fnv.New64()
|
||||||
|
// We purposely ignore the error because the implementation always returns nil.
|
||||||
|
_, _ = hasher.Write([]byte(input))
|
||||||
|
|
||||||
|
return fmt.Sprintf("%x", hasher.Sum64())
|
||||||
}
|
}
|
||||||
|
|
|
@ -254,6 +254,8 @@ func TestSticky(t *testing.T) {
|
||||||
req := httptest.NewRequest(http.MethodGet, "/", nil)
|
req := httptest.NewRequest(http.MethodGet, "/", nil)
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
for _, cookie := range recorder.Result().Cookies() {
|
for _, cookie := range recorder.Result().Cookies() {
|
||||||
|
assert.NotContains(t, "test=first", cookie.Value)
|
||||||
|
assert.NotContains(t, "test=second", cookie.Value)
|
||||||
req.AddCookie(cookie)
|
req.AddCookie(cookie)
|
||||||
}
|
}
|
||||||
recorder.ResponseRecorder = httptest.NewRecorder()
|
recorder.ResponseRecorder = httptest.NewRecorder()
|
||||||
|
@ -265,6 +267,35 @@ func TestSticky(t *testing.T) {
|
||||||
assert.Equal(t, 3, recorder.save["second"])
|
assert.Equal(t, 3, recorder.save["second"])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSticky_FallBack(t *testing.T) {
|
||||||
|
balancer := New(&dynamic.Sticky{
|
||||||
|
Cookie: &dynamic.Cookie{Name: "test"},
|
||||||
|
}, nil)
|
||||||
|
|
||||||
|
balancer.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
rw.Header().Set("server", "first")
|
||||||
|
rw.WriteHeader(http.StatusOK)
|
||||||
|
}), Int(1))
|
||||||
|
|
||||||
|
balancer.AddService("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
rw.Header().Set("server", "second")
|
||||||
|
rw.WriteHeader(http.StatusOK)
|
||||||
|
}), Int(2))
|
||||||
|
|
||||||
|
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/", nil)
|
||||||
|
req.AddCookie(&http.Cookie{Name: "test", Value: "second"})
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
recorder.ResponseRecorder = httptest.NewRecorder()
|
||||||
|
|
||||||
|
balancer.ServeHTTP(recorder, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, 0, recorder.save["first"])
|
||||||
|
assert.Equal(t, 3, recorder.save["second"])
|
||||||
|
}
|
||||||
|
|
||||||
// TestBalancerBias makes sure that the WRR algorithm spreads elements evenly right from the start,
|
// TestBalancerBias makes sure that the WRR algorithm spreads elements evenly right from the start,
|
||||||
// and that it does not "over-favor" the high-weighted ones with a biased start-up regime.
|
// and that it does not "over-favor" the high-weighted ones with a biased start-up regime.
|
||||||
func TestBalancerBias(t *testing.T) {
|
func TestBalancerBias(t *testing.T) {
|
||||||
|
|
Loading…
Add table
Reference in a new issue