Support gRPC healthcheck
This commit is contained in:
parent
df99a9fb57
commit
033fccccc7
14 changed files with 374 additions and 107 deletions
|
@ -154,6 +154,7 @@
|
|||
- "traefik.http.services.service01.loadbalancer.healthcheck.method=foobar"
|
||||
- "traefik.http.services.service01.loadbalancer.healthcheck.port=42"
|
||||
- "traefik.http.services.service01.loadbalancer.healthcheck.scheme=foobar"
|
||||
- "traefik.http.services.service01.loadbalancer.healthcheck.mode=foobar"
|
||||
- "traefik.http.services.service01.loadbalancer.healthcheck.timeout=foobar"
|
||||
- "traefik.http.services.service01.loadbalancer.passhostheader=true"
|
||||
- "traefik.http.services.service01.loadbalancer.responseforwarding.flushinterval=foobar"
|
||||
|
|
|
@ -53,6 +53,7 @@
|
|||
url = "foobar"
|
||||
[http.services.Service01.loadBalancer.healthCheck]
|
||||
scheme = "foobar"
|
||||
mode = "foobar"
|
||||
path = "foobar"
|
||||
method = "foobar"
|
||||
port = 42
|
||||
|
|
|
@ -58,6 +58,7 @@ http:
|
|||
- url: foobar
|
||||
healthCheck:
|
||||
scheme: foobar
|
||||
mode: foobar
|
||||
path: foobar
|
||||
method: foobar
|
||||
port: 42
|
||||
|
|
|
@ -208,6 +208,7 @@
|
|||
| `traefik/http/services/Service01/loadBalancer/healthCheck/hostname` | `foobar` |
|
||||
| `traefik/http/services/Service01/loadBalancer/healthCheck/interval` | `foobar` |
|
||||
| `traefik/http/services/Service01/loadBalancer/healthCheck/method` | `foobar` |
|
||||
| `traefik/http/services/Service01/loadBalancer/healthCheck/mode` | `foobar` |
|
||||
| `traefik/http/services/Service01/loadBalancer/healthCheck/path` | `foobar` |
|
||||
| `traefik/http/services/Service01/loadBalancer/healthCheck/port` | `42` |
|
||||
| `traefik/http/services/Service01/loadBalancer/healthCheck/scheme` | `foobar` |
|
||||
|
|
|
@ -154,6 +154,7 @@
|
|||
"traefik.http.services.service01.loadbalancer.healthcheck.method": "foobar",
|
||||
"traefik.http.services.service01.loadbalancer.healthcheck.port": "42",
|
||||
"traefik.http.services.service01.loadbalancer.healthcheck.scheme": "foobar",
|
||||
"traefik.http.services.service01.loadbalancer.healthcheck.mode": "foobar",
|
||||
"traefik.http.services.service01.loadbalancer.healthcheck.timeout": "foobar",
|
||||
"traefik.http.services.service01.loadbalancer.passhostheader": "true",
|
||||
"traefik.http.services.service01.loadbalancer.responseforwarding.flushinterval": "foobar",
|
||||
|
|
|
@ -316,7 +316,8 @@ On subsequent requests, to keep the session alive with the same server, the clie
|
|||
#### Health Check
|
||||
|
||||
Configure health check to remove unhealthy servers from the load balancing rotation.
|
||||
Traefik will consider your servers healthy as long as they return status codes between `2XX` and `3XX` to the health check requests (carried out every `interval`).
|
||||
Traefik will consider your HTTP(s) servers healthy as long as they return status codes between `2XX` and `3XX` to the health check requests (carried out every `interval`).
|
||||
For gRPC servers, Traefik will consider them healthy as long as they return `SERVING` to [gRPC health check v1](https://github.com/grpc/grpc/blob/master/doc/health-checking.md) requests.
|
||||
|
||||
To propagate status changes (e.g. all servers of this service are down) upwards, HealthCheck must also be enabled on the parent(s) of this service.
|
||||
|
||||
|
@ -324,6 +325,7 @@ Below are the available options for the health check mechanism:
|
|||
|
||||
- `path` (required), defines the server URL path for the health check endpoint .
|
||||
- `scheme` (optional), replaces the server URL `scheme` for the health check endpoint.
|
||||
- `mode` (default: http), if defined to `grpc`, will use the gRPC health check protocol to probe the server.
|
||||
- `hostname` (optional), sets the value of `hostname` in the `Host` header of the health check request.
|
||||
- `port` (optional), replaces the server URL `port` for the health check endpoint.
|
||||
- `interval` (default: 30s), defines the frequency of the health check calls.
|
||||
|
|
|
@ -422,6 +422,7 @@
|
|||
url = "foobar"
|
||||
[http.services.Service0.loadBalancer.healthCheck]
|
||||
scheme = "foobar"
|
||||
mode = "foobar"
|
||||
path = "foobar"
|
||||
port = 42
|
||||
interval = "foobar"
|
||||
|
|
|
@ -213,6 +213,7 @@ func (s *Server) SetDefaults() {
|
|||
// ServerHealthCheck holds the HealthCheck configuration.
|
||||
type ServerHealthCheck struct {
|
||||
Scheme string `json:"scheme,omitempty" toml:"scheme,omitempty" yaml:"scheme,omitempty" export:"true"`
|
||||
Mode string `json:"mode,omitempty" toml:"mode,omitempty" yaml:"mode,omitempty" export:"true"`
|
||||
Path string `json:"path,omitempty" toml:"path,omitempty" yaml:"path,omitempty" export:"true"`
|
||||
Method string `json:"method,omitempty" toml:"method,omitempty" yaml:"method,omitempty" export:"true"`
|
||||
Port int `json:"port,omitempty" toml:"port,omitempty,omitzero" yaml:"port,omitempty" export:"true"`
|
||||
|
@ -229,6 +230,7 @@ type ServerHealthCheck struct {
|
|||
func (h *ServerHealthCheck) SetDefaults() {
|
||||
fr := true
|
||||
h.FollowRedirects = &fr
|
||||
h.Mode = "http"
|
||||
}
|
||||
|
||||
// +k8s:deepcopy-gen=true
|
||||
|
|
|
@ -153,6 +153,7 @@ func TestDecodeConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service0.loadbalancer.healthcheck.method": "foobar",
|
||||
"traefik.http.services.Service0.loadbalancer.healthcheck.port": "42",
|
||||
"traefik.http.services.Service0.loadbalancer.healthcheck.scheme": "foobar",
|
||||
"traefik.http.services.Service0.loadbalancer.healthcheck.mode": "foobar",
|
||||
"traefik.http.services.Service0.loadbalancer.healthcheck.timeout": "foobar",
|
||||
"traefik.http.services.Service0.loadbalancer.healthcheck.followredirects": "true",
|
||||
"traefik.http.services.Service0.loadbalancer.passhostheader": "true",
|
||||
|
@ -169,6 +170,7 @@ func TestDecodeConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.loadbalancer.healthcheck.method": "foobar",
|
||||
"traefik.http.services.Service1.loadbalancer.healthcheck.port": "42",
|
||||
"traefik.http.services.Service1.loadbalancer.healthcheck.scheme": "foobar",
|
||||
"traefik.http.services.Service1.loadbalancer.healthcheck.mode": "foobar",
|
||||
"traefik.http.services.Service1.loadbalancer.healthcheck.timeout": "foobar",
|
||||
"traefik.http.services.Service1.loadbalancer.healthcheck.followredirects": "true",
|
||||
"traefik.http.services.Service1.loadbalancer.passhostheader": "true",
|
||||
|
@ -650,6 +652,7 @@ func TestDecodeConfiguration(t *testing.T) {
|
|||
},
|
||||
HealthCheck: &dynamic.ServerHealthCheck{
|
||||
Scheme: "foobar",
|
||||
Mode: "foobar",
|
||||
Path: "foobar",
|
||||
Method: "foobar",
|
||||
Port: 42,
|
||||
|
@ -678,6 +681,7 @@ func TestDecodeConfiguration(t *testing.T) {
|
|||
},
|
||||
HealthCheck: &dynamic.ServerHealthCheck{
|
||||
Scheme: "foobar",
|
||||
Mode: "foobar",
|
||||
Path: "foobar",
|
||||
Method: "foobar",
|
||||
Port: 42,
|
||||
|
|
|
@ -19,6 +19,10 @@ import (
|
|||
"github.com/traefik/traefik/v2/pkg/metrics"
|
||||
"github.com/traefik/traefik/v2/pkg/safe"
|
||||
"github.com/vulcand/oxy/roundrobin"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
healthpb "google.golang.org/grpc/health/grpc_health_v1"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -26,6 +30,11 @@ const (
|
|||
serverDown = "DOWN"
|
||||
)
|
||||
|
||||
const (
|
||||
HTTPMode = "http"
|
||||
GRPCMode = "grpc"
|
||||
)
|
||||
|
||||
var (
|
||||
singleton *HealthCheck
|
||||
once sync.Once
|
||||
|
@ -60,6 +69,7 @@ type Options struct {
|
|||
Headers map[string]string
|
||||
Hostname string
|
||||
Scheme string
|
||||
Mode string
|
||||
Path string
|
||||
Method string
|
||||
Port int
|
||||
|
@ -245,9 +255,18 @@ func NewBackendConfig(options Options, backendName string) *BackendConfig {
|
|||
}
|
||||
}
|
||||
|
||||
// checkHealth returns a nil error in case it was successful and otherwise
|
||||
// a non-nil error with a meaningful description why the health check failed.
|
||||
// checkHealth calls the proper health check function depending on the
|
||||
// backend config mode, defaults to HTTP.
|
||||
func checkHealth(serverURL *url.URL, backend *BackendConfig) error {
|
||||
if backend.Options.Mode == GRPCMode {
|
||||
return checkHealthGRPC(serverURL, backend)
|
||||
}
|
||||
return checkHealthHTTP(serverURL, backend)
|
||||
}
|
||||
|
||||
// checkHealthHTTP returns an error with a meaningful description if the health check failed.
|
||||
// Dedicated to HTTP servers.
|
||||
func checkHealthHTTP(serverURL *url.URL, backend *BackendConfig) error {
|
||||
req, err := backend.newRequest(serverURL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create HTTP request: %w", err)
|
||||
|
@ -280,6 +299,60 @@ func checkHealth(serverURL *url.URL, backend *BackendConfig) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// checkHealthGRPC returns an error with a meaningful description if the health check failed.
|
||||
// Dedicated to gRPC servers implementing gRPC Health Checking Protocol v1.
|
||||
func checkHealthGRPC(serverURL *url.URL, backend *BackendConfig) error {
|
||||
u, err := serverURL.Parse(backend.Path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse server URL: %w", err)
|
||||
}
|
||||
|
||||
port := u.Port()
|
||||
if backend.Options.Port != 0 {
|
||||
port = strconv.Itoa(backend.Options.Port)
|
||||
}
|
||||
|
||||
serverAddr := net.JoinHostPort(u.Hostname(), port)
|
||||
|
||||
var opts []grpc.DialOption
|
||||
switch backend.Options.Scheme {
|
||||
case "http", "h2c", "":
|
||||
opts = append(opts, grpc.WithInsecure())
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), backend.Options.Timeout)
|
||||
defer cancel()
|
||||
|
||||
conn, err := grpc.DialContext(ctx, serverAddr, opts...)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return fmt.Errorf("fail to connect to %s within %s: %w", serverAddr, backend.Options.Timeout, err)
|
||||
}
|
||||
return fmt.Errorf("fail to connect to %s: %w", serverAddr, err)
|
||||
}
|
||||
defer func() { _ = conn.Close() }()
|
||||
|
||||
resp, err := healthpb.NewHealthClient(conn).Check(ctx, &healthpb.HealthCheckRequest{})
|
||||
if err != nil {
|
||||
if stat, ok := status.FromError(err); ok {
|
||||
switch stat.Code() {
|
||||
case codes.Unimplemented:
|
||||
return fmt.Errorf("gRPC server does not implement the health protocol: %w", err)
|
||||
case codes.DeadlineExceeded:
|
||||
return fmt.Errorf("gRPC health check timeout: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("gRPC health check failed: %w", err)
|
||||
}
|
||||
|
||||
if resp.Status != healthpb.HealthCheckResponse_SERVING {
|
||||
return fmt.Errorf("received gRPC status code: %v", resp.Status)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StatusUpdater should be implemented by a service that, when its status
|
||||
// changes (e.g. all if its children are down), needs to propagate upwards (to
|
||||
// their parent(s)) that change.
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/traefik/traefik/v2/pkg/config/runtime"
|
||||
"github.com/traefik/traefik/v2/pkg/testhelpers"
|
||||
"github.com/vulcand/oxy/roundrobin"
|
||||
healthpb "google.golang.org/grpc/health/grpc_health_v1"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -21,16 +22,12 @@ const (
|
|||
healthCheckTimeout = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
type testHandler struct {
|
||||
done func()
|
||||
healthSequence []int
|
||||
}
|
||||
|
||||
func TestSetBackendsConfiguration(t *testing.T) {
|
||||
testCases := []struct {
|
||||
desc string
|
||||
startHealthy bool
|
||||
healthSequence []int
|
||||
mode string
|
||||
server StartTestServer
|
||||
expectedNumRemovedServers int
|
||||
expectedNumUpsertedServers int
|
||||
expectedGaugeValue float64
|
||||
|
@ -38,7 +35,7 @@ func TestSetBackendsConfiguration(t *testing.T) {
|
|||
{
|
||||
desc: "healthy server staying healthy",
|
||||
startHealthy: true,
|
||||
healthSequence: []int{http.StatusOK},
|
||||
server: newHTTPServer(http.StatusOK),
|
||||
expectedNumRemovedServers: 0,
|
||||
expectedNumUpsertedServers: 0,
|
||||
expectedGaugeValue: 1,
|
||||
|
@ -46,7 +43,7 @@ func TestSetBackendsConfiguration(t *testing.T) {
|
|||
{
|
||||
desc: "healthy server staying healthy (StatusNoContent)",
|
||||
startHealthy: true,
|
||||
healthSequence: []int{http.StatusNoContent},
|
||||
server: newHTTPServer(http.StatusNoContent),
|
||||
expectedNumRemovedServers: 0,
|
||||
expectedNumUpsertedServers: 0,
|
||||
expectedGaugeValue: 1,
|
||||
|
@ -54,7 +51,7 @@ func TestSetBackendsConfiguration(t *testing.T) {
|
|||
{
|
||||
desc: "healthy server staying healthy (StatusPermanentRedirect)",
|
||||
startHealthy: true,
|
||||
healthSequence: []int{http.StatusPermanentRedirect},
|
||||
server: newHTTPServer(http.StatusPermanentRedirect),
|
||||
expectedNumRemovedServers: 0,
|
||||
expectedNumUpsertedServers: 0,
|
||||
expectedGaugeValue: 1,
|
||||
|
@ -62,7 +59,7 @@ func TestSetBackendsConfiguration(t *testing.T) {
|
|||
{
|
||||
desc: "healthy server becoming sick",
|
||||
startHealthy: true,
|
||||
healthSequence: []int{http.StatusServiceUnavailable},
|
||||
server: newHTTPServer(http.StatusServiceUnavailable),
|
||||
expectedNumRemovedServers: 1,
|
||||
expectedNumUpsertedServers: 0,
|
||||
expectedGaugeValue: 0,
|
||||
|
@ -70,7 +67,7 @@ func TestSetBackendsConfiguration(t *testing.T) {
|
|||
{
|
||||
desc: "sick server becoming healthy",
|
||||
startHealthy: false,
|
||||
healthSequence: []int{http.StatusOK},
|
||||
server: newHTTPServer(http.StatusOK),
|
||||
expectedNumRemovedServers: 0,
|
||||
expectedNumUpsertedServers: 1,
|
||||
expectedGaugeValue: 1,
|
||||
|
@ -78,7 +75,7 @@ func TestSetBackendsConfiguration(t *testing.T) {
|
|||
{
|
||||
desc: "sick server staying sick",
|
||||
startHealthy: false,
|
||||
healthSequence: []int{http.StatusServiceUnavailable},
|
||||
server: newHTTPServer(http.StatusServiceUnavailable),
|
||||
expectedNumRemovedServers: 0,
|
||||
expectedNumUpsertedServers: 0,
|
||||
expectedGaugeValue: 0,
|
||||
|
@ -86,7 +83,52 @@ func TestSetBackendsConfiguration(t *testing.T) {
|
|||
{
|
||||
desc: "healthy server toggling to sick and back to healthy",
|
||||
startHealthy: true,
|
||||
healthSequence: []int{http.StatusServiceUnavailable, http.StatusOK},
|
||||
server: newHTTPServer(http.StatusServiceUnavailable, http.StatusOK),
|
||||
expectedNumRemovedServers: 1,
|
||||
expectedNumUpsertedServers: 1,
|
||||
expectedGaugeValue: 1,
|
||||
},
|
||||
{
|
||||
desc: "healthy grpc server staying healthy",
|
||||
mode: "grpc",
|
||||
startHealthy: true,
|
||||
server: newGRPCServer(healthpb.HealthCheckResponse_SERVING),
|
||||
expectedNumRemovedServers: 0,
|
||||
expectedNumUpsertedServers: 0,
|
||||
expectedGaugeValue: 1,
|
||||
},
|
||||
{
|
||||
desc: "healthy grpc server becoming sick",
|
||||
mode: "grpc",
|
||||
startHealthy: true,
|
||||
server: newGRPCServer(healthpb.HealthCheckResponse_NOT_SERVING),
|
||||
expectedNumRemovedServers: 1,
|
||||
expectedNumUpsertedServers: 0,
|
||||
expectedGaugeValue: 0,
|
||||
},
|
||||
{
|
||||
desc: "sick grpc server becoming healthy",
|
||||
mode: "grpc",
|
||||
startHealthy: false,
|
||||
server: newGRPCServer(healthpb.HealthCheckResponse_SERVING),
|
||||
expectedNumRemovedServers: 0,
|
||||
expectedNumUpsertedServers: 1,
|
||||
expectedGaugeValue: 1,
|
||||
},
|
||||
{
|
||||
desc: "sick grpc server staying sick",
|
||||
mode: "grpc",
|
||||
startHealthy: false,
|
||||
server: newGRPCServer(healthpb.HealthCheckResponse_NOT_SERVING),
|
||||
expectedNumRemovedServers: 0,
|
||||
expectedNumUpsertedServers: 0,
|
||||
expectedGaugeValue: 0,
|
||||
},
|
||||
{
|
||||
desc: "healthy grpc server toggling to sick and back to healthy",
|
||||
mode: "grpc",
|
||||
startHealthy: true,
|
||||
server: newGRPCServer(healthpb.HealthCheckResponse_NOT_SERVING, healthpb.HealthCheckResponse_SERVING),
|
||||
expectedNumRemovedServers: 1,
|
||||
expectedNumUpsertedServers: 1,
|
||||
expectedGaugeValue: 1,
|
||||
|
@ -98,22 +140,24 @@ func TestSetBackendsConfiguration(t *testing.T) {
|
|||
t.Run(test.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// The context is passed to the health check and canonically canceled by
|
||||
// the test server once all expected requests have been received.
|
||||
// The context is passed to the health check and
|
||||
// canonically canceled by the test server once all expected requests have been received.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
ts := newTestServer(cancel, test.healthSequence)
|
||||
defer ts.Close()
|
||||
t.Cleanup(cancel)
|
||||
|
||||
serverURL, timeout := test.server.Start(t, cancel)
|
||||
|
||||
lb := &testLoadBalancer{RWMutex: &sync.RWMutex{}}
|
||||
backend := NewBackendConfig(Options{
|
||||
|
||||
options := Options{
|
||||
Mode: test.mode,
|
||||
Path: "/path",
|
||||
Interval: healthCheckInterval,
|
||||
Timeout: healthCheckTimeout,
|
||||
LB: lb,
|
||||
}, "backendName")
|
||||
}
|
||||
backend := NewBackendConfig(options, "backendName")
|
||||
|
||||
serverURL := testhelpers.MustParseURL(ts.URL)
|
||||
if test.startHealthy {
|
||||
lb.servers = append(lb.servers, serverURL)
|
||||
} else {
|
||||
|
@ -121,6 +165,7 @@ func TestSetBackendsConfiguration(t *testing.T) {
|
|||
}
|
||||
|
||||
collectingMetrics := &testhelpers.CollectingGauge{}
|
||||
|
||||
check := HealthCheck{
|
||||
Backends: make(map[string]*BackendConfig),
|
||||
metrics: metricsHealthcheck{serverUpGauge: collectingMetrics},
|
||||
|
@ -134,9 +179,6 @@ func TestSetBackendsConfiguration(t *testing.T) {
|
|||
wg.Done()
|
||||
}()
|
||||
|
||||
// Make test timeout dependent on number of expected requests, health
|
||||
// check interval, and a safety margin.
|
||||
timeout := time.Duration(len(test.healthSequence)*int(healthCheckInterval) + 500)
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
t.Fatal("test did not complete in time")
|
||||
|
@ -453,86 +495,6 @@ func TestBalancers_RemoveServer(t *testing.T) {
|
|||
assert.Equal(t, 0, len(balancer2.Servers()))
|
||||
}
|
||||
|
||||
type testLoadBalancer struct {
|
||||
// RWMutex needed due to parallel test execution: Both the system-under-test
|
||||
// and the test assertions reference the counters.
|
||||
*sync.RWMutex
|
||||
numRemovedServers int
|
||||
numUpsertedServers int
|
||||
servers []*url.URL
|
||||
// options is just to make sure that LBStatusUpdater forwards options on Upsert to its BalancerHandler
|
||||
options []roundrobin.ServerOption
|
||||
}
|
||||
|
||||
func (lb *testLoadBalancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
// noop
|
||||
}
|
||||
|
||||
func (lb *testLoadBalancer) RemoveServer(u *url.URL) error {
|
||||
lb.Lock()
|
||||
defer lb.Unlock()
|
||||
lb.numRemovedServers++
|
||||
lb.removeServer(u)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lb *testLoadBalancer) UpsertServer(u *url.URL, options ...roundrobin.ServerOption) error {
|
||||
lb.Lock()
|
||||
defer lb.Unlock()
|
||||
lb.numUpsertedServers++
|
||||
lb.servers = append(lb.servers, u)
|
||||
lb.options = append(lb.options, options...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lb *testLoadBalancer) Servers() []*url.URL {
|
||||
return lb.servers
|
||||
}
|
||||
|
||||
func (lb *testLoadBalancer) Options() []roundrobin.ServerOption {
|
||||
return lb.options
|
||||
}
|
||||
|
||||
func (lb *testLoadBalancer) removeServer(u *url.URL) {
|
||||
var i int
|
||||
var serverURL *url.URL
|
||||
found := false
|
||||
for i, serverURL = range lb.servers {
|
||||
if *serverURL == *u {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
|
||||
lb.servers = append(lb.servers[:i], lb.servers[i+1:]...)
|
||||
}
|
||||
|
||||
func newTestServer(done func(), healthSequence []int) *httptest.Server {
|
||||
handler := &testHandler{
|
||||
done: done,
|
||||
healthSequence: healthSequence,
|
||||
}
|
||||
return httptest.NewServer(handler)
|
||||
}
|
||||
|
||||
// ServeHTTP returns HTTP response codes following a status sequences.
|
||||
// It calls the given 'done' function once all request health indicators have been depleted.
|
||||
func (th *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if len(th.healthSequence) == 0 {
|
||||
panic("received unexpected request")
|
||||
}
|
||||
|
||||
w.WriteHeader(th.healthSequence[0])
|
||||
|
||||
th.healthSequence = th.healthSequence[1:]
|
||||
if len(th.healthSequence) == 0 {
|
||||
th.done()
|
||||
}
|
||||
}
|
||||
|
||||
func TestLBStatusUpdater(t *testing.T) {
|
||||
lb := &testLoadBalancer{RWMutex: &sync.RWMutex{}}
|
||||
svInfo := &runtime.ServiceInfo{}
|
||||
|
|
205
pkg/healthcheck/mock_test.go
Normal file
205
pkg/healthcheck/mock_test.go
Normal file
|
@ -0,0 +1,205 @@
|
|||
package healthcheck
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/traefik/traefik/v2/pkg/testhelpers"
|
||||
"github.com/vulcand/oxy/roundrobin"
|
||||
"google.golang.org/grpc"
|
||||
healthpb "google.golang.org/grpc/health/grpc_health_v1"
|
||||
)
|
||||
|
||||
type StartTestServer interface {
|
||||
Start(t *testing.T, done func()) (*url.URL, time.Duration)
|
||||
}
|
||||
|
||||
type Status interface {
|
||||
~int | ~int32
|
||||
}
|
||||
|
||||
type HealthSequence[T Status] struct {
|
||||
sequenceMu sync.Mutex
|
||||
sequence []T
|
||||
}
|
||||
|
||||
func (s *HealthSequence[T]) Pop() T {
|
||||
s.sequenceMu.Lock()
|
||||
defer s.sequenceMu.Unlock()
|
||||
|
||||
stat := s.sequence[0]
|
||||
|
||||
s.sequence = s.sequence[1:]
|
||||
|
||||
return stat
|
||||
}
|
||||
|
||||
func (s *HealthSequence[T]) IsEmpty() bool {
|
||||
s.sequenceMu.Lock()
|
||||
defer s.sequenceMu.Unlock()
|
||||
|
||||
return len(s.sequence) == 0
|
||||
}
|
||||
|
||||
type GRPCServer struct {
|
||||
status HealthSequence[healthpb.HealthCheckResponse_ServingStatus]
|
||||
done func()
|
||||
}
|
||||
|
||||
func newGRPCServer(healthSequence ...healthpb.HealthCheckResponse_ServingStatus) *GRPCServer {
|
||||
gRPCService := &GRPCServer{
|
||||
status: HealthSequence[healthpb.HealthCheckResponse_ServingStatus]{
|
||||
sequence: healthSequence,
|
||||
},
|
||||
}
|
||||
|
||||
return gRPCService
|
||||
}
|
||||
|
||||
func (s *GRPCServer) Check(_ context.Context, _ *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
|
||||
stat := s.status.Pop()
|
||||
if s.status.IsEmpty() {
|
||||
s.done()
|
||||
}
|
||||
|
||||
return &healthpb.HealthCheckResponse{
|
||||
Status: stat,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *GRPCServer) Watch(_ *healthpb.HealthCheckRequest, server healthpb.Health_WatchServer) error {
|
||||
stat := s.status.Pop()
|
||||
if s.status.IsEmpty() {
|
||||
s.done()
|
||||
}
|
||||
|
||||
return server.Send(&healthpb.HealthCheckResponse{
|
||||
Status: stat,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *GRPCServer) Start(t *testing.T, done func()) (*url.URL, time.Duration) {
|
||||
t.Helper()
|
||||
|
||||
listener, err := net.Listen("tcp4", "127.0.0.1:0")
|
||||
assert.NoError(t, err)
|
||||
t.Cleanup(func() { _ = listener.Close() })
|
||||
|
||||
server := grpc.NewServer()
|
||||
t.Cleanup(server.Stop)
|
||||
|
||||
s.done = done
|
||||
|
||||
healthpb.RegisterHealthServer(server, s)
|
||||
|
||||
go func() {
|
||||
err := server.Serve(listener)
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
|
||||
// Make test timeout dependent on number of expected requests, health check interval, and a safety margin.
|
||||
return testhelpers.MustParseURL("http://" + listener.Addr().String()), time.Duration(len(s.status.sequence)*int(healthCheckInterval) + 500)
|
||||
}
|
||||
|
||||
type HTTPServer struct {
|
||||
status HealthSequence[int]
|
||||
done func()
|
||||
}
|
||||
|
||||
func newHTTPServer(healthSequence ...int) *HTTPServer {
|
||||
handler := &HTTPServer{
|
||||
status: HealthSequence[int]{
|
||||
sequence: healthSequence,
|
||||
},
|
||||
}
|
||||
|
||||
return handler
|
||||
}
|
||||
|
||||
// ServeHTTP returns HTTP response codes following a status sequences.
|
||||
// It calls the given 'done' function once all request health indicators have been depleted.
|
||||
func (s *HTTPServer) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
|
||||
stat := s.status.Pop()
|
||||
|
||||
w.WriteHeader(stat)
|
||||
|
||||
if s.status.IsEmpty() {
|
||||
s.done()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *HTTPServer) Start(t *testing.T, done func()) (*url.URL, time.Duration) {
|
||||
t.Helper()
|
||||
|
||||
s.done = done
|
||||
|
||||
ts := httptest.NewServer(s)
|
||||
t.Cleanup(ts.Close)
|
||||
|
||||
// Make test timeout dependent on number of expected requests, health check interval, and a safety margin.
|
||||
return testhelpers.MustParseURL(ts.URL), time.Duration(len(s.status.sequence)*int(healthCheckInterval) + 500)
|
||||
}
|
||||
|
||||
type testLoadBalancer struct {
|
||||
// RWMutex needed due to parallel test execution: Both the system-under-test
|
||||
// and the test assertions reference the counters.
|
||||
*sync.RWMutex
|
||||
numRemovedServers int
|
||||
numUpsertedServers int
|
||||
servers []*url.URL
|
||||
// options is just to make sure that LBStatusUpdater forwards options on Upsert to its BalancerHandler
|
||||
options []roundrobin.ServerOption
|
||||
}
|
||||
|
||||
func (lb *testLoadBalancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
// noop
|
||||
}
|
||||
|
||||
func (lb *testLoadBalancer) RemoveServer(u *url.URL) error {
|
||||
lb.Lock()
|
||||
defer lb.Unlock()
|
||||
lb.numRemovedServers++
|
||||
lb.removeServer(u)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lb *testLoadBalancer) UpsertServer(u *url.URL, options ...roundrobin.ServerOption) error {
|
||||
lb.Lock()
|
||||
defer lb.Unlock()
|
||||
lb.numUpsertedServers++
|
||||
lb.servers = append(lb.servers, u)
|
||||
lb.options = append(lb.options, options...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lb *testLoadBalancer) Servers() []*url.URL {
|
||||
return lb.servers
|
||||
}
|
||||
|
||||
func (lb *testLoadBalancer) Options() []roundrobin.ServerOption {
|
||||
return lb.options
|
||||
}
|
||||
|
||||
func (lb *testLoadBalancer) removeServer(u *url.URL) {
|
||||
var i int
|
||||
var serverURL *url.URL
|
||||
found := false
|
||||
for i, serverURL = range lb.servers {
|
||||
if *serverURL == *u {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return
|
||||
}
|
||||
|
||||
lb.servers = append(lb.servers[:i], lb.servers[i+1:]...)
|
||||
}
|
|
@ -48,6 +48,7 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik/http/services/Service01/loadBalancer/healthCheck/headers/name0": "foobar",
|
||||
"traefik/http/services/Service01/loadBalancer/healthCheck/headers/name1": "foobar",
|
||||
"traefik/http/services/Service01/loadBalancer/healthCheck/scheme": "foobar",
|
||||
"traefik/http/services/Service01/loadBalancer/healthCheck/mode": "foobar",
|
||||
"traefik/http/services/Service01/loadBalancer/healthCheck/followredirects": "true",
|
||||
"traefik/http/services/Service01/loadBalancer/responseForwarding/flushInterval": "foobar",
|
||||
"traefik/http/services/Service01/loadBalancer/passHostHeader": "true",
|
||||
|
@ -642,6 +643,7 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
},
|
||||
HealthCheck: &dynamic.ServerHealthCheck{
|
||||
Scheme: "foobar",
|
||||
Mode: "foobar",
|
||||
Path: "foobar",
|
||||
Port: 42,
|
||||
Interval: "foobar",
|
||||
|
|
|
@ -360,8 +360,19 @@ func buildHealthCheckOptions(ctx context.Context, lb healthcheck.Balancer, backe
|
|||
followRedirects = *hc.FollowRedirects
|
||||
}
|
||||
|
||||
mode := healthcheck.HTTPMode
|
||||
switch hc.Mode {
|
||||
case "":
|
||||
mode = healthcheck.HTTPMode
|
||||
case healthcheck.GRPCMode, healthcheck.HTTPMode:
|
||||
mode = hc.Mode
|
||||
default:
|
||||
logger.Errorf("Illegal health check mode for backend '%s'", backend)
|
||||
}
|
||||
|
||||
return &healthcheck.Options{
|
||||
Scheme: hc.Scheme,
|
||||
Mode: mode,
|
||||
Path: hc.Path,
|
||||
Method: hc.Method,
|
||||
Port: hc.Port,
|
||||
|
|
Loading…
Reference in a new issue