From 838a8e18d3c80771783794865bea3240a9d5f8c3 Mon Sep 17 00:00:00 2001 From: mpl Date: Fri, 25 Jun 2021 21:08:11 +0200 Subject: [PATCH] healthcheck: add support at the load-balancers of services level Co-authored-by: Dmitry Sharshakov Co-authored-by: Julien Salleyron Co-authored-by: Jean-Baptiste Doumenjou <925513+jbdoumenjou@users.noreply.github.com> Co-authored-by: Romain Co-authored-by: Tom Moulard --- .../reference/dynamic-configuration/file.toml | 3 + .../reference/dynamic-configuration/file.yaml | 2 + .../reference/dynamic-configuration/kv-ref.md | 2 + docs/content/routing/services/index.md | 150 ++++++++++ .../fixtures/healthcheck/propagate.toml | 117 ++++++++ .../healthcheck/propagate_no_healthcheck.toml | 38 +++ .../healthcheck/reload_with_healthcheck.toml | 46 +++ .../reload_without_healthcheck.toml | 46 +++ integration/healthcheck_test.go | 268 ++++++++++++++++++ integration/resources/compose/healthcheck.yml | 6 + pkg/anonymize/anonymize_config_test.go | 2 +- pkg/config/dynamic/http_config.go | 28 +- pkg/config/dynamic/zz_generated.deepcopy.go | 52 +++- pkg/config/label/label_test.go | 8 +- pkg/config/runtime/runtime_test.go | 6 +- pkg/healthcheck/healthcheck.go | 117 ++++++-- pkg/healthcheck/healthcheck_test.go | 2 +- .../empty_backend_handler.go | 23 +- .../empty_backend_handler_test.go | 4 + pkg/provider/kv/kv_test.go | 2 +- pkg/server/router/router_test.go | 2 +- pkg/server/server_entrypoint_tcp_test.go | 4 +- .../service/loadbalancer/mirror/mirror.go | 41 ++- .../loadbalancer/mirror/mirror_test.go | 12 +- pkg/server/service/loadbalancer/wrr/wrr.go | 123 ++++++-- .../service/loadbalancer/wrr/wrr_test.go | 162 ++++++++++- pkg/server/service/service.go | 46 +-- pkg/server/service/service_test.go | 4 +- 28 files changed, 1196 insertions(+), 120 deletions(-) create mode 100644 integration/fixtures/healthcheck/propagate.toml create mode 100644 integration/fixtures/healthcheck/propagate_no_healthcheck.toml create mode 100644 integration/fixtures/healthcheck/reload_with_healthcheck.toml create mode 100644 integration/fixtures/healthcheck/reload_without_healthcheck.toml diff --git a/docs/content/reference/dynamic-configuration/file.toml b/docs/content/reference/dynamic-configuration/file.toml index 3fbd66122..c207be966 100644 --- a/docs/content/reference/dynamic-configuration/file.toml +++ b/docs/content/reference/dynamic-configuration/file.toml @@ -69,6 +69,8 @@ service = "foobar" maxBodySize = 42 + [http.services.Service02.mirroring.healthCheck] + [[http.services.Service02.mirroring.mirrors]] name = "foobar" percent = 42 @@ -78,6 +80,7 @@ percent = 42 [http.services.Service03] [http.services.Service03.weighted] + [http.services.Service03.weighted.healthCheck] [[http.services.Service03.weighted.services]] name = "foobar" diff --git a/docs/content/reference/dynamic-configuration/file.yaml b/docs/content/reference/dynamic-configuration/file.yaml index 72f87ff0d..b060e6fb1 100644 --- a/docs/content/reference/dynamic-configuration/file.yaml +++ b/docs/content/reference/dynamic-configuration/file.yaml @@ -75,6 +75,7 @@ http: mirroring: service: foobar maxBodySize: 42 + healthCheck: {} mirrors: - name: foobar percent: 42 @@ -82,6 +83,7 @@ http: percent: 42 Service03: weighted: + healthCheck: {} services: - name: foobar weight: 42 diff --git a/docs/content/reference/dynamic-configuration/kv-ref.md b/docs/content/reference/dynamic-configuration/kv-ref.md index 170798ed0..2650202a7 100644 --- a/docs/content/reference/dynamic-configuration/kv-ref.md +++ b/docs/content/reference/dynamic-configuration/kv-ref.md @@ -208,12 +208,14 @@ | `traefik/http/services/Service01/loadBalancer/sticky/cookie/name` | `foobar` | | `traefik/http/services/Service01/loadBalancer/sticky/cookie/sameSite` | `foobar` | | `traefik/http/services/Service01/loadBalancer/sticky/cookie/secure` | `true` | +| `traefik/http/services/Service02/mirroring/healthCheck` | `` | | `traefik/http/services/Service02/mirroring/maxBodySize` | `42` | | `traefik/http/services/Service02/mirroring/mirrors/0/name` | `foobar` | | `traefik/http/services/Service02/mirroring/mirrors/0/percent` | `42` | | `traefik/http/services/Service02/mirroring/mirrors/1/name` | `foobar` | | `traefik/http/services/Service02/mirroring/mirrors/1/percent` | `42` | | `traefik/http/services/Service02/mirroring/service` | `foobar` | +| `traefik/http/services/Service03/weighted/healthCheck` | `` | | `traefik/http/services/Service03/weighted/services/0/name` | `foobar` | | `traefik/http/services/Service03/weighted/services/0/weight` | `42` | | `traefik/http/services/Service03/weighted/services/1/name` | `foobar` | diff --git a/docs/content/routing/services/index.md b/docs/content/routing/services/index.md index 2520bf855..c11e8cdd3 100644 --- a/docs/content/routing/services/index.md +++ b/docs/content/routing/services/index.md @@ -313,6 +313,8 @@ On subsequent requests, to keep the session alive with the same server, the clie Configure health check to remove unhealthy servers from the load balancing rotation. Traefik will consider your servers healthy as long as they return status codes between `2XX` and `3XX` to the health check requests (carried out every `interval`). +To propagate status changes (e.g. all servers of this service are down) upwards, HealthCheck must also be enabled on the parent(s) of this service. + Below are the available options for the health check mechanism: - `path` is appended to the server URL to set the health check endpoint. @@ -900,6 +902,84 @@ http: url = "http://private-ip-server-2/" ``` +#### Health Check + +HealthCheck enables automatic self-healthcheck for this service, i.e. whenever +one of its children is reported as down, this service becomes aware of it, and +takes it into account (i.e. it ignores the down child) when running the +load-balancing algorithm. In addition, if the parent of this service also has +HealthCheck enabled, this service reports to its parent any status change. + +!!! info "All or nothing" + + If HealthCheck is enabled for a given service, but any of its descendants does + not have it enabled, the creation of the service will fail. + + HealthCheck on Weighted services can be defined currently only with the [File](../../providers/file.md) provider. + +```yaml tab="YAML" +## Dynamic configuration +http: + services: + app: + weighted: + healthCheck: {} + services: + - name: appv1 + weight: 3 + - name: appv2 + weight: 1 + + appv1: + loadBalancer: + healthCheck: + path: /status + interval: 10s + timeout: 3s + servers: + - url: "http://private-ip-server-1/" + + appv2: + loadBalancer: + healthCheck: + path: /status + interval: 10s + timeout: 3s + servers: + - url: "http://private-ip-server-2/" +``` + +```toml tab="TOML" +## Dynamic configuration +[http.services] + [http.services.app] + [http.services.app.weighted.healthCheck] + [[http.services.app.weighted.services]] + name = "appv1" + weight = 3 + [[http.services.app.weighted.services]] + name = "appv2" + weight = 1 + + [http.services.appv1] + [http.services.appv1.loadBalancer] + [http.services.appv1.loadBalancer.healthCheck] + path = "/health" + interval = "10s" + timeout = "3s" + [[http.services.appv1.loadBalancer.servers]] + url = "http://private-ip-server-1/" + + [http.services.appv2] + [http.services.appv2.loadBalancer] + [http.services.appv2.loadBalancer.healthCheck] + path = "/health" + interval = "10s" + timeout = "3s" + [[http.services.appv2.loadBalancer.servers]] + url = "http://private-ip-server-2/" +``` + ### Mirroring (service) The mirroring is able to mirror requests sent to a service to other services. @@ -961,6 +1041,76 @@ http: url = "http://private-ip-server-2/" ``` +#### Health Check + +HealthCheck enables automatic self-healthcheck for this service, i.e. if the +main handler of the service becomes unreachable, the information is propagated +upwards to its parent. + +!!! info "All or nothing" + + If HealthCheck is enabled for a given service, but any of its descendants does + not have it enabled, the creation of the service will fail. + + HealthCheck on Mirroring services can be defined currently only with the [File](../../providers/file.md) provider. + +```yaml tab="YAML" +## Dynamic configuration +http: + services: + mirrored-api: + mirroring: + healthCheck: {} + service: appv1 + mirrors: + - name: appv2 + percent: 10 + + appv1: + loadBalancer: + healthCheck: + path: /status + interval: 10s + timeout: 3s + servers: + - url: "http://private-ip-server-1/" + + appv2: + loadBalancer: + servers: + - url: "http://private-ip-server-2/" +``` + +```toml tab="TOML" +## Dynamic configuration +[http.services] + [http.services.mirrored-api] + [http.services.mirrored-api.mirroring] + [http.services.mirrored-api.mirroring.healthCheck] + service = "appv1" + [[http.services.mirrored-api.mirroring.mirrors]] + name = "appv2" + percent = 10 + + [http.services.appv1] + [http.services.appv1.loadBalancer] + [http.services.appv1.loadBalancer.healthCheck] + path = "/health" + interval = "10s" + timeout = "3s" + [[http.services.appv1.loadBalancer.servers]] + url = "http://private-ip-server-1/" + + [http.services.appv2] + [http.services.appv2.loadBalancer] + [http.services.appv1.loadBalancer.healthCheck] + path = "/health" + interval = "10s" + timeout = "3s" + [[http.services.appv2.loadBalancer.servers]] + url = "http://private-ip-server-2/" +``` + ## Configuring TCP Services ### General diff --git a/integration/fixtures/healthcheck/propagate.toml b/integration/fixtures/healthcheck/propagate.toml new file mode 100644 index 000000000..a5f11c2b4 --- /dev/null +++ b/integration/fixtures/healthcheck/propagate.toml @@ -0,0 +1,117 @@ +[global] + checkNewVersion = false + sendAnonymousUsage = false + +[log] + level = "DEBUG" + +[entryPoints] + [entryPoints.web] + address = ":8000" + +[api] + insecure = true + +[providers.file] + filename = "{{ .SelfFilename }}" + +## dynamic configuration ## + +[http.routers] + [http.routers.wsp-router-1] + service = "wsp-service1" + rule = "Host(`root.localhost`)" + [http.routers.wsp-router-2] + service = "wsp-service2" + rule = "Host(`foo.localhost`)" + [http.routers.wsp-router-3] + service = "wsp-service3" + rule = "Host(`bar.localhost`)" + +[http.services] + [http.services.wsp-service1.weighted] + [http.services.wsp-service1.weighted.healthcheck] + [[http.services.wsp-service1.weighted.services]] + name = "wsp12" + weight = 1 + [[http.services.wsp-service1.weighted.services]] + name = "wsp34" + weight = 1 + [http.services.wsp-service2.weighted] + [http.services.wsp-service2.weighted.healthcheck] + [[http.services.wsp-service2.weighted.services]] + name = "wsp13" + weight = 1 + [[http.services.wsp-service2.weighted.services]] + name = "wsp12" + weight = 1 + [http.services.wsp-service3.weighted] + [http.services.wsp-service3.weighted.healthcheck] + [[http.services.wsp-service3.weighted.services]] + name = "wsp13" + weight = 1 + [[http.services.wsp-service3.weighted.services]] + name = "wsp12b" + weight = 1 + [http.services.wsp12.weighted] + [http.services.wsp12.weighted.healthcheck] + [[http.services.wsp12.weighted.services]] + name = "wsp1" + weight = 1 + [[http.services.wsp12.weighted.services]] + name = "wsp2" + weight = 1 + [http.services.wsp34.weighted] + [http.services.wsp34.weighted.healthcheck] + [[http.services.wsp34.weighted.services]] + name = "wsp3" + weight = 1 + [[http.services.wsp34.weighted.services]] + name = "wsp4" + weight = 1 + [http.services.wsp13.weighted] + [http.services.wsp13.weighted.healthcheck] + [[http.services.wsp13.weighted.services]] + name = "wsp1" + weight = 1 + [[http.services.wsp13.weighted.services]] + name = "wsp3" + weight = 1 + + [http.services.wsp1.loadBalancer] + [http.services.wsp1.loadBalancer.healthcheck] + path = "/health" + interval = "1s" + timeout = "0.9s" + [[http.services.wsp1.loadBalancer.servers]] + url = "http://{{.Server1}}:80" + [http.services.wsp2.loadBalancer] + [http.services.wsp2.loadBalancer.healthcheck] + path = "/health" + interval = "1s" + timeout = "0.9s" + [[http.services.wsp2.loadBalancer.servers]] + url = "http://{{.Server2}}:80" + [http.services.wsp3.loadBalancer] + [http.services.wsp3.loadBalancer.healthcheck] + path = "/health" + interval = "1s" + timeout = "0.9s" + [[http.services.wsp3.loadBalancer.servers]] + url = "http://{{.Server3}}:80" + [http.services.wsp4.loadBalancer] + [http.services.wsp4.loadBalancer.healthcheck] + path = "/health" + interval = "1s" + timeout = "0.9s" + [[http.services.wsp4.loadBalancer.servers]] + url = "http://{{.Server4}}:80" + [http.services.wsp12b.loadBalancer] + [http.services.wsp12b.loadBalancer.healthcheck] + path = "/health" + interval = "1s" + timeout = "0.9s" + [[http.services.wsp12b.loadBalancer.servers]] + url = "http://{{.Server1}}:80" + [[http.services.wsp12b.loadBalancer.servers]] + url = "http://{{.Server2}}:80" diff --git a/integration/fixtures/healthcheck/propagate_no_healthcheck.toml b/integration/fixtures/healthcheck/propagate_no_healthcheck.toml new file mode 100644 index 000000000..6fb2a6432 --- /dev/null +++ b/integration/fixtures/healthcheck/propagate_no_healthcheck.toml @@ -0,0 +1,38 @@ +[global] + checkNewVersion = false + sendAnonymousUsage = false + +[log] + level = "DEBUG" + +[entryPoints] + [entryPoints.web] + address = ":8000" + +[api] + insecure = true + +[providers.file] + filename = "{{ .SelfFilename }}" + +## dynamic configuration ## + +[http.routers] + [http.routers.wsp-router-1] + service = "wsp-service1" + rule = "Host(`root.localhost`)" + + [http.routers.noop] + service = "noop@internal" + rule = "Host(`noop.localhost`)" + +[http.services] + [http.services.wsp-service1.weighted] + [http.services.wsp-service1.weighted.healthcheck] + [[http.services.wsp-service1.weighted.services]] + name = "wsp1" + weight = 1 + + [http.services.wsp1.loadBalancer] + [[http.services.wsp1.loadBalancer.servers]] + url = "http://{{.Server1}}:80" diff --git a/integration/fixtures/healthcheck/reload_with_healthcheck.toml b/integration/fixtures/healthcheck/reload_with_healthcheck.toml new file mode 100644 index 000000000..d2e794c91 --- /dev/null +++ b/integration/fixtures/healthcheck/reload_with_healthcheck.toml @@ -0,0 +1,46 @@ +[global] + checkNewVersion = false + sendAnonymousUsage = false + +[log] + level = "DEBUG" + +[entryPoints] + [entryPoints.web] + address = ":8000" + +[api] + insecure = true + +[providers.file] + filename = "{{ .SelfFilename }}" + +[http.routers] + [http.routers.wsp-router-1] + service = "wsp-service1" + rule = "Host(`root.localhost`)" + +[http.services] + [http.services.wsp-service1.weighted] + [http.services.wsp-service1.weighted.healthcheck] + [[http.services.wsp-service1.weighted.services]] + name = "wsp1" + weight = 1 + [[http.services.wsp-service1.weighted.services]] + name = "wsp2" + weight = 1 + + [http.services.wsp1.loadBalancer] + [http.services.wsp1.loadBalancer.healthcheck] + path = "/health" + interval = "1s" + timeout = "0.9s" + [[http.services.wsp1.loadBalancer.servers]] + url = "http://{{.Server1}}:80" + [http.services.wsp2.loadBalancer] + [http.services.wsp2.loadBalancer.healthcheck] + path = "/health" + interval = "1s" + timeout = "0.9s" + [[http.services.wsp2.loadBalancer.servers]] + url = "http://{{.Server2}}:80" diff --git a/integration/fixtures/healthcheck/reload_without_healthcheck.toml b/integration/fixtures/healthcheck/reload_without_healthcheck.toml new file mode 100644 index 000000000..350cb4fd9 --- /dev/null +++ b/integration/fixtures/healthcheck/reload_without_healthcheck.toml @@ -0,0 +1,46 @@ +[global] + checkNewVersion = false + sendAnonymousUsage = false + +[log] + level = "DEBUG" + +[entryPoints] + [entryPoints.web] + address = ":8000" + +[api] + insecure = true + +[providers.file] + filename = "{{ .SelfFilename }}" + +[http.routers] + [http.routers.wsp-router-1] + service = "wsp-service1" + rule = "Host(`root.localhost`)" + +[http.services] + [http.services.wsp-service1.weighted] +# [http.services.wsp-service1.weighted.healthcheck] + [[http.services.wsp-service1.weighted.services]] + name = "wsp1" + weight = 1 + [[http.services.wsp-service1.weighted.services]] + name = "wsp2" + weight = 1 + + [http.services.wsp1.loadBalancer] + [http.services.wsp1.loadBalancer.healthcheck] + path = "/health" + interval = "1s" + timeout = "0.9s" + [[http.services.wsp1.loadBalancer.servers]] + url = "http://{{.Server1}}:80" + [http.services.wsp2.loadBalancer] + [http.services.wsp2.loadBalancer.healthcheck] + path = "/health" + interval = "1s" + timeout = "0.9s" + [[http.services.wsp2.loadBalancer.servers]] + url = "http://{{.Server2}}:80" diff --git a/integration/healthcheck_test.go b/integration/healthcheck_test.go index 8d268ad85..c2b22d610 100644 --- a/integration/healthcheck_test.go +++ b/integration/healthcheck_test.go @@ -2,6 +2,8 @@ package integration import ( "bytes" + "fmt" + "io/ioutil" "net/http" "os" "time" @@ -16,6 +18,8 @@ type HealthCheckSuite struct { BaseSuite whoami1IP string whoami2IP string + whoami3IP string + whoami4IP string } func (s *HealthCheckSuite) SetUpSuite(c *check.C) { @@ -24,6 +28,8 @@ func (s *HealthCheckSuite) SetUpSuite(c *check.C) { s.whoami1IP = s.composeProject.Container(c, "whoami1").NetworkSettings.IPAddress s.whoami2IP = s.composeProject.Container(c, "whoami2").NetworkSettings.IPAddress + s.whoami3IP = s.composeProject.Container(c, "whoami3").NetworkSettings.IPAddress + s.whoami4IP = s.composeProject.Container(c, "whoami4").NetworkSettings.IPAddress } func (s *HealthCheckSuite) TestSimpleConfiguration(c *check.C) { @@ -271,3 +277,265 @@ func (s *HealthCheckSuite) TestMultipleRoutersOnSameService(c *check.C) { err = try.Request(healthReqWeb2, 3*time.Second, try.StatusCodeIs(http.StatusOK)) c.Assert(err, checker.IsNil) } + +func (s *HealthCheckSuite) TestPropagate(c *check.C) { + file := s.adaptFile(c, "fixtures/healthcheck/propagate.toml", struct { + Server1 string + Server2 string + Server3 string + Server4 string + }{s.whoami1IP, s.whoami2IP, s.whoami3IP, s.whoami4IP}) + defer os.Remove(file) + + cmd, display := s.traefikCmd(withConfigFile(file)) + defer display(c) + err := cmd.Start() + c.Assert(err, checker.IsNil) + defer s.killCmd(cmd) + + // wait for traefik + err = try.GetRequest("http://127.0.0.1:8080/api/rawdata", 60*time.Second, try.BodyContains("Host(`root.localhost`)")) + c.Assert(err, checker.IsNil) + + rootReq, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000", nil) + c.Assert(err, checker.IsNil) + rootReq.Host = "root.localhost" + + err = try.Request(rootReq, 500*time.Millisecond, try.StatusCodeIs(http.StatusOK)) + c.Assert(err, checker.IsNil) + + // Bring whoami1 and whoami3 down + client := http.Client{ + Timeout: 10 * time.Second, + } + + whoamiHosts := []string{s.whoami1IP, s.whoami3IP} + for _, whoami := range whoamiHosts { + statusInternalServerErrorReq, err := http.NewRequest(http.MethodPost, "http://"+whoami+"/health", bytes.NewBuffer([]byte("500"))) + c.Assert(err, checker.IsNil) + _, err = client.Do(statusInternalServerErrorReq) + c.Assert(err, checker.IsNil) + } + + try.Sleep(time.Second) + + // Verify load-balancing on root still works, and that we're getting wsp2, wsp4, wsp2, wsp4, etc. + var want string + for i := 0; i < 4; i++ { + if i%2 == 0 { + want = `IP: ` + s.whoami4IP + } else { + want = `IP: ` + s.whoami2IP + } + + resp, err := client.Do(rootReq) + c.Assert(err, checker.IsNil) + + body, err := ioutil.ReadAll(resp.Body) + c.Assert(err, checker.IsNil) + + c.Assert(string(body), checker.Contains, want) + } + + fooReq, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000", nil) + c.Assert(err, checker.IsNil) + fooReq.Host = "foo.localhost" + + // Verify load-balancing on foo still works, and that we're getting wsp2, wsp2, wsp2, wsp2, etc. + want = `IP: ` + s.whoami2IP + for i := 0; i < 4; i++ { + resp, err := client.Do(fooReq) + c.Assert(err, checker.IsNil) + + body, err := ioutil.ReadAll(resp.Body) + c.Assert(err, checker.IsNil) + + c.Assert(string(body), checker.Contains, want) + } + + barReq, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000", nil) + c.Assert(err, checker.IsNil) + barReq.Host = "bar.localhost" + + // Verify load-balancing on bar still works, and that we're getting wsp2, wsp2, wsp2, wsp2, etc. + want = `IP: ` + s.whoami2IP + for i := 0; i < 4; i++ { + resp, err := client.Do(barReq) + c.Assert(err, checker.IsNil) + + body, err := ioutil.ReadAll(resp.Body) + c.Assert(err, checker.IsNil) + + c.Assert(string(body), checker.Contains, want) + } + + // Bring whoami2 and whoami4 down + whoamiHosts = []string{s.whoami2IP, s.whoami4IP} + for _, whoami := range whoamiHosts { + statusInternalServerErrorReq, err := http.NewRequest(http.MethodPost, "http://"+whoami+"/health", bytes.NewBuffer([]byte("500"))) + c.Assert(err, checker.IsNil) + _, err = client.Do(statusInternalServerErrorReq) + c.Assert(err, checker.IsNil) + } + + try.Sleep(time.Second) + + // Verify that everything is down, and that we get 503s everywhere. + for i := 0; i < 2; i++ { + resp, err := client.Do(rootReq) + c.Assert(err, checker.IsNil) + c.Assert(resp.StatusCode, checker.Equals, http.StatusServiceUnavailable) + + resp, err = client.Do(fooReq) + c.Assert(err, checker.IsNil) + c.Assert(resp.StatusCode, checker.Equals, http.StatusServiceUnavailable) + + resp, err = client.Do(barReq) + c.Assert(err, checker.IsNil) + c.Assert(resp.StatusCode, checker.Equals, http.StatusServiceUnavailable) + } + + // Bring everything back up. + whoamiHosts = []string{s.whoami1IP, s.whoami2IP, s.whoami3IP, s.whoami4IP} + for _, whoami := range whoamiHosts { + statusOKReq, err := http.NewRequest(http.MethodPost, "http://"+whoami+"/health", bytes.NewBuffer([]byte("200"))) + c.Assert(err, checker.IsNil) + _, err = client.Do(statusOKReq) + c.Assert(err, checker.IsNil) + } + + try.Sleep(time.Second) + + // Verify everything is up on root router. + wantIPs := []string{s.whoami3IP, s.whoami1IP, s.whoami4IP, s.whoami2IP} + for i := 0; i < 4; i++ { + want := `IP: ` + wantIPs[i] + resp, err := client.Do(rootReq) + c.Assert(err, checker.IsNil) + + body, err := ioutil.ReadAll(resp.Body) + c.Assert(err, checker.IsNil) + + c.Assert(string(body), checker.Contains, want) + } + + // Verify everything is up on foo router. + wantIPs = []string{s.whoami1IP, s.whoami1IP, s.whoami3IP, s.whoami2IP} + for i := 0; i < 4; i++ { + want := `IP: ` + wantIPs[i] + resp, err := client.Do(fooReq) + c.Assert(err, checker.IsNil) + + body, err := ioutil.ReadAll(resp.Body) + c.Assert(err, checker.IsNil) + + c.Assert(string(body), checker.Contains, want) + } + + // Verify everything is up on bar router. + wantIPs = []string{s.whoami1IP, s.whoami1IP, s.whoami3IP, s.whoami2IP} + for i := 0; i < 4; i++ { + want := `IP: ` + wantIPs[i] + resp, err := client.Do(barReq) + c.Assert(err, checker.IsNil) + + body, err := ioutil.ReadAll(resp.Body) + c.Assert(err, checker.IsNil) + + c.Assert(string(body), checker.Contains, want) + } +} + +func (s *HealthCheckSuite) TestPropagateNoHealthCheck(c *check.C) { + file := s.adaptFile(c, "fixtures/healthcheck/propagate_no_healthcheck.toml", struct { + Server1 string + }{s.whoami1IP}) + defer os.Remove(file) + + cmd, display := s.traefikCmd(withConfigFile(file)) + defer display(c) + err := cmd.Start() + c.Assert(err, checker.IsNil) + defer s.killCmd(cmd) + + // wait for traefik + err = try.GetRequest("http://127.0.0.1:8080/api/rawdata", 60*time.Second, try.BodyContains("Host(`noop.localhost`)"), try.BodyNotContains("Host(`root.localhost`)")) + c.Assert(err, checker.IsNil) + + rootReq, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000", nil) + c.Assert(err, checker.IsNil) + rootReq.Host = "root.localhost" + + err = try.Request(rootReq, 500*time.Millisecond, try.StatusCodeIs(http.StatusNotFound)) + c.Assert(err, checker.IsNil) +} + +func (s *HealthCheckSuite) TestPropagateReload(c *check.C) { + // Setup a WSP service without the healthcheck enabled (wsp-service1) + withoutHealthCheck := s.adaptFile(c, "fixtures/healthcheck/reload_without_healthcheck.toml", struct { + Server1 string + Server2 string + }{s.whoami1IP, s.whoami2IP}) + defer os.Remove(withoutHealthCheck) + withHealthCheck := s.adaptFile(c, "fixtures/healthcheck/reload_with_healthcheck.toml", struct { + Server1 string + Server2 string + }{s.whoami1IP, s.whoami2IP}) + defer os.Remove(withHealthCheck) + + cmd, display := s.traefikCmd(withConfigFile(withoutHealthCheck)) + defer display(c) + err := cmd.Start() + c.Assert(err, checker.IsNil) + defer s.killCmd(cmd) + + // wait for traefik + err = try.GetRequest("http://127.0.0.1:8080/api/rawdata", 60*time.Second, try.BodyContains("Host(`root.localhost`)")) + c.Assert(err, checker.IsNil) + + // Allow one of the underlying services on it to fail all servers HC (whoami2) + client := http.Client{ + Timeout: 10 * time.Second, + } + statusOKReq, err := http.NewRequest(http.MethodPost, "http://"+s.whoami2IP+"/health", bytes.NewBuffer([]byte("500"))) + c.Assert(err, checker.IsNil) + _, err = client.Do(statusOKReq) + c.Assert(err, checker.IsNil) + + rootReq, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000", nil) + c.Assert(err, checker.IsNil) + rootReq.Host = "root.localhost" + + // Check the failed service (whoami2) is getting requests, but answer 500 + err = try.Request(rootReq, 500*time.Millisecond, try.StatusCodeIs(http.StatusServiceUnavailable)) + c.Assert(err, checker.IsNil) + + // Enable the healthcheck on the root WSP (wsp-service1) and let Traefik reload the config + fr1, err := os.OpenFile(withoutHealthCheck, os.O_APPEND|os.O_WRONLY, 0o644) + c.Assert(fr1, checker.NotNil) + c.Assert(err, checker.IsNil) + err = fr1.Truncate(0) + c.Assert(err, checker.IsNil) + + fr2, err := os.ReadFile(withHealthCheck) + c.Assert(err, checker.IsNil) + _, err = fmt.Fprint(fr1, string(fr2)) + c.Assert(err, checker.IsNil) + err = fr1.Close() + c.Assert(err, checker.IsNil) + + try.Sleep(1 * time.Second) + + // Check the failed service (whoami2) is not getting requests + wantIPs := []string{s.whoami1IP, s.whoami1IP, s.whoami1IP, s.whoami1IP} + for _, ip := range wantIPs { + want := "IP: " + ip + resp, err := client.Do(rootReq) + c.Assert(err, checker.IsNil) + + body, err := ioutil.ReadAll(resp.Body) + c.Assert(err, checker.IsNil) + + c.Assert(string(body), checker.Contains, want) + } +} diff --git a/integration/resources/compose/healthcheck.yml b/integration/resources/compose/healthcheck.yml index c81fb293a..c52f5bcde 100644 --- a/integration/resources/compose/healthcheck.yml +++ b/integration/resources/compose/healthcheck.yml @@ -3,3 +3,9 @@ whoami1: whoami2: image: traefik/whoami + +whoami3: + image: traefik/whoami + +whoami4: + image: traefik/whoami diff --git a/pkg/anonymize/anonymize_config_test.go b/pkg/anonymize/anonymize_config_test.go index e8f2703aa..8a1133fca 100644 --- a/pkg/anonymize/anonymize_config_test.go +++ b/pkg/anonymize/anonymize_config_test.go @@ -77,7 +77,7 @@ func TestDo_dynamicConfiguration(t *testing.T) { SameSite: "foo", }, }, - HealthCheck: &dynamic.HealthCheck{ + HealthCheck: &dynamic.ServerHealthCheck{ Scheme: "foo", Path: "foo", Port: 42, diff --git a/pkg/config/dynamic/http_config.go b/pkg/config/dynamic/http_config.go index ec359a656..c2f335e9a 100644 --- a/pkg/config/dynamic/http_config.go +++ b/pkg/config/dynamic/http_config.go @@ -65,6 +65,7 @@ type Mirroring struct { Service string `json:"service,omitempty" toml:"service,omitempty" yaml:"service,omitempty" export:"true"` MaxBodySize *int64 `json:"maxBodySize,omitempty" toml:"maxBodySize,omitempty" yaml:"maxBodySize,omitempty" export:"true"` Mirrors []MirrorService `json:"mirrors,omitempty" toml:"mirrors,omitempty" yaml:"mirrors,omitempty" export:"true"` + HealthCheck *HealthCheck `json:"healthCheck,omitempty" toml:"healthCheck,omitempty" yaml:"healthCheck,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"` } // SetDefaults Default values for a WRRService. @@ -87,6 +88,12 @@ type MirrorService struct { type WeightedRoundRobin struct { Services []WRRService `json:"services,omitempty" toml:"services,omitempty" yaml:"services,omitempty" export:"true"` Sticky *Sticky `json:"sticky,omitempty" toml:"sticky,omitempty" yaml:"sticky,omitempty" export:"true"` + // HealthCheck enables automatic self-healthcheck for this service, i.e. + // whenever one of its children is reported as down, this service becomes aware of it, + // and takes it into account (i.e. it ignores the down child) when running the + // load-balancing algorithm. In addition, if the parent of this service also has + // HealthCheck enabled, this service reports to its parent any status change. + HealthCheck *HealthCheck `json:"healthCheck,omitempty" toml:"healthCheck,omitempty" yaml:"healthCheck,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"` } // +k8s:deepcopy-gen=true @@ -124,9 +131,13 @@ type Cookie struct { // ServersLoadBalancer holds the ServersLoadBalancer configuration. type ServersLoadBalancer struct { - Sticky *Sticky `json:"sticky,omitempty" toml:"sticky,omitempty" yaml:"sticky,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"` - Servers []Server `json:"servers,omitempty" toml:"servers,omitempty" yaml:"servers,omitempty" label-slice-as-struct:"server" export:"true"` - HealthCheck *HealthCheck `json:"healthCheck,omitempty" toml:"healthCheck,omitempty" yaml:"healthCheck,omitempty" export:"true"` + Sticky *Sticky `json:"sticky,omitempty" toml:"sticky,omitempty" yaml:"sticky,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"` + Servers []Server `json:"servers,omitempty" toml:"servers,omitempty" yaml:"servers,omitempty" label-slice-as-struct:"server" export:"true"` + // HealthCheck enables regular active checks of the responsiveness of the + // children servers of this load-balancer. To propagate status changes (e.g. all + // servers of this service are down) upwards, HealthCheck must also be enabled on + // the parent(s) of this service. + HealthCheck *ServerHealthCheck `json:"healthCheck,omitempty" toml:"healthCheck,omitempty" yaml:"healthCheck,omitempty" export:"true"` PassHostHeader *bool `json:"passHostHeader" toml:"passHostHeader" yaml:"passHostHeader" export:"true"` ResponseForwarding *ResponseForwarding `json:"responseForwarding,omitempty" toml:"responseForwarding,omitempty" yaml:"responseForwarding,omitempty" export:"true"` ServersTransport string `json:"serversTransport,omitempty" toml:"serversTransport,omitempty" yaml:"serversTransport,omitempty" export:"true"` @@ -178,8 +189,8 @@ func (s *Server) SetDefaults() { // +k8s:deepcopy-gen=true -// HealthCheck holds the HealthCheck configuration. -type HealthCheck struct { +// ServerHealthCheck holds the HealthCheck configuration. +type ServerHealthCheck struct { Scheme string `json:"scheme,omitempty" toml:"scheme,omitempty" yaml:"scheme,omitempty" export:"true"` Path string `json:"path,omitempty" toml:"path,omitempty" yaml:"path,omitempty" export:"true"` Port int `json:"port,omitempty" toml:"port,omitempty,omitzero" yaml:"port,omitempty" export:"true"` @@ -193,13 +204,18 @@ type HealthCheck struct { } // SetDefaults Default values for a HealthCheck. -func (h *HealthCheck) SetDefaults() { +func (h *ServerHealthCheck) SetDefaults() { fr := true h.FollowRedirects = &fr } // +k8s:deepcopy-gen=true +// HealthCheck controls healthcheck awareness and propagation at the services level. +type HealthCheck struct{} + +// +k8s:deepcopy-gen=true + // ServersTransport options to configure communication between Traefik and the servers. type ServersTransport struct { ServerName string `description:"ServerName used to contact the server" json:"serverName,omitempty" toml:"serverName,omitempty" yaml:"serverName,omitempty"` diff --git a/pkg/config/dynamic/zz_generated.deepcopy.go b/pkg/config/dynamic/zz_generated.deepcopy.go index 3c0ee71c0..742e59c65 100644 --- a/pkg/config/dynamic/zz_generated.deepcopy.go +++ b/pkg/config/dynamic/zz_generated.deepcopy.go @@ -513,18 +513,6 @@ func (in *Headers) DeepCopy() *Headers { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HealthCheck) DeepCopyInto(out *HealthCheck) { *out = *in - if in.FollowRedirects != nil { - in, out := &in.FollowRedirects, &out.FollowRedirects - *out = new(bool) - **out = **in - } - if in.Headers != nil { - in, out := &in.Headers, &out.Headers - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val - } - } return } @@ -789,6 +777,11 @@ func (in *Mirroring) DeepCopyInto(out *Mirroring) { *out = make([]MirrorService, len(*in)) copy(*out, *in) } + if in.HealthCheck != nil { + in, out := &in.HealthCheck, &out.HealthCheck + *out = new(HealthCheck) + **out = **in + } return } @@ -1075,6 +1068,34 @@ func (in *Server) DeepCopy() *Server { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ServerHealthCheck) DeepCopyInto(out *ServerHealthCheck) { + *out = *in + if in.FollowRedirects != nil { + in, out := &in.FollowRedirects, &out.FollowRedirects + *out = new(bool) + **out = **in + } + if in.Headers != nil { + in, out := &in.Headers, &out.Headers + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ServerHealthCheck. +func (in *ServerHealthCheck) DeepCopy() *ServerHealthCheck { + if in == nil { + return nil + } + out := new(ServerHealthCheck) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ServersLoadBalancer) DeepCopyInto(out *ServersLoadBalancer) { *out = *in @@ -1090,7 +1111,7 @@ func (in *ServersLoadBalancer) DeepCopyInto(out *ServersLoadBalancer) { } if in.HealthCheck != nil { in, out := &in.HealthCheck, &out.HealthCheck - *out = new(HealthCheck) + *out = new(ServerHealthCheck) (*in).DeepCopyInto(*out) } if in.PassHostHeader != nil { @@ -1826,6 +1847,11 @@ func (in *WeightedRoundRobin) DeepCopyInto(out *WeightedRoundRobin) { *out = new(Sticky) (*in).DeepCopyInto(*out) } + if in.HealthCheck != nil { + in, out := &in.HealthCheck, &out.HealthCheck + *out = new(HealthCheck) + **out = **in + } return } diff --git a/pkg/config/label/label_test.go b/pkg/config/label/label_test.go index 7935418d9..fca311d8e 100644 --- a/pkg/config/label/label_test.go +++ b/pkg/config/label/label_test.go @@ -625,7 +625,7 @@ func TestDecodeConfiguration(t *testing.T) { Port: "8080", }, }, - HealthCheck: &dynamic.HealthCheck{ + HealthCheck: &dynamic.ServerHealthCheck{ Scheme: "foobar", Path: "foobar", Port: 42, @@ -652,7 +652,7 @@ func TestDecodeConfiguration(t *testing.T) { Port: "8080", }, }, - HealthCheck: &dynamic.HealthCheck{ + HealthCheck: &dynamic.ServerHealthCheck{ Scheme: "foobar", Path: "foobar", Port: 42, @@ -1100,7 +1100,7 @@ func TestEncodeConfiguration(t *testing.T) { Port: "8080", }, }, - HealthCheck: &dynamic.HealthCheck{ + HealthCheck: &dynamic.ServerHealthCheck{ Scheme: "foobar", Path: "foobar", Port: 42, @@ -1126,7 +1126,7 @@ func TestEncodeConfiguration(t *testing.T) { Port: "8080", }, }, - HealthCheck: &dynamic.HealthCheck{ + HealthCheck: &dynamic.ServerHealthCheck{ Scheme: "foobar", Path: "foobar", Port: 42, diff --git a/pkg/config/runtime/runtime_test.go b/pkg/config/runtime/runtime_test.go index 4dce299dd..5ecb8864d 100644 --- a/pkg/config/runtime/runtime_test.go +++ b/pkg/config/runtime/runtime_test.go @@ -48,7 +48,7 @@ func TestPopulateUsedBy(t *testing.T) { {URL: "http://127.0.0.1:8085"}, {URL: "http://127.0.0.1:8086"}, }, - HealthCheck: &dynamic.HealthCheck{ + HealthCheck: &dynamic.ServerHealthCheck{ Interval: "500ms", Path: "/health", }, @@ -158,7 +158,7 @@ func TestPopulateUsedBy(t *testing.T) { URL: "http://127.0.0.1:8086", }, }, - HealthCheck: &dynamic.HealthCheck{ + HealthCheck: &dynamic.ServerHealthCheck{ Interval: "500ms", Path: "/health", }, @@ -176,7 +176,7 @@ func TestPopulateUsedBy(t *testing.T) { URL: "http://127.0.0.1:8088", }, }, - HealthCheck: &dynamic.HealthCheck{ + HealthCheck: &dynamic.ServerHealthCheck{ Interval: "500ms", Path: "/health", }, diff --git a/pkg/healthcheck/healthcheck.go b/pkg/healthcheck/healthcheck.go index 2ba13feca..fd633f713 100644 --- a/pkg/healthcheck/healthcheck.go +++ b/pkg/healthcheck/healthcheck.go @@ -2,6 +2,7 @@ package healthcheck import ( "context" + "errors" "fmt" "net" "net/http" @@ -11,6 +12,7 @@ import ( "time" gokitmetrics "github.com/go-kit/kit/metrics" + "github.com/traefik/traefik/v2/pkg/config/dynamic" "github.com/traefik/traefik/v2/pkg/config/runtime" "github.com/traefik/traefik/v2/pkg/log" "github.com/traefik/traefik/v2/pkg/metrics" @@ -41,6 +43,13 @@ type BalancerHandler interface { Balancer } +// BalancerStatusHandler is an http Handler that does load-balancing, +// andupdates its parents of its status. +type BalancerStatusHandler interface { + BalancerHandler + StatusUpdater +} + type metricsHealthcheck struct { serverUpGauge gokitmetrics.Gauge } @@ -130,9 +139,10 @@ func (hc *HealthCheck) SetBackendsConfiguration(parentCtx context.Context, backe func (hc *HealthCheck) execute(ctx context.Context, backend *BackendConfig) { logger := log.FromContext(ctx) - logger.Debugf("Initial health check for backend: %q", backend.name) - hc.checkBackend(ctx, backend) + logger.Debugf("Initial health check for backend: %q", backend.name) + hc.checkServersLB(ctx, backend) + ticker := time.NewTicker(backend.Interval) defer ticker.Stop() for { @@ -141,13 +151,13 @@ func (hc *HealthCheck) execute(ctx context.Context, backend *BackendConfig) { logger.Debugf("Stopping current health check goroutines of backend: %s", backend.name) return case <-ticker.C: - logger.Debugf("Refreshing health check for backend: %s", backend.name) - hc.checkBackend(ctx, backend) + logger.Debugf("Routine health check refresh for backend: %s", backend.name) + hc.checkServersLB(ctx, backend) } } } -func (hc *HealthCheck) checkBackend(ctx context.Context, backend *BackendConfig) { +func (hc *HealthCheck) checkServersLB(ctx context.Context, backend *BackendConfig) { logger := log.FromContext(ctx) enabledURLs := backend.LB.Servers() @@ -157,12 +167,11 @@ func (hc *HealthCheck) checkBackend(ctx context.Context, backend *BackendConfig) serverUpMetricValue := float64(0) if err := checkHealth(disabledURL.url, backend); err == nil { - logger.Warnf("Health check up: Returning to server list. Backend: %q URL: %q Weight: %d", + logger.Warnf("Health check up: returning to server list. Backend: %q URL: %q Weight: %d", backend.name, disabledURL.url.String(), disabledURL.weight) if err = backend.LB.UpsertServer(disabledURL.url, roundrobin.Weight(disabledURL.weight)); err != nil { logger.Error(err) } - serverUpMetricValue = 1 } else { logger.Warnf("Health check still failing. Backend: %q URL: %q Reason: %s", backend.name, disabledURL.url.String(), err) @@ -175,31 +184,31 @@ func (hc *HealthCheck) checkBackend(ctx context.Context, backend *BackendConfig) backend.disabledURLs = newDisabledURLs - for _, enableURL := range enabledURLs { + for _, enabledURL := range enabledURLs { serverUpMetricValue := float64(1) - if err := checkHealth(enableURL, backend); err != nil { + if err := checkHealth(enabledURL, backend); err != nil { weight := 1 rr, ok := backend.LB.(*roundrobin.RoundRobin) if ok { var gotWeight bool - weight, gotWeight = rr.ServerWeight(enableURL) + weight, gotWeight = rr.ServerWeight(enabledURL) if !gotWeight { weight = 1 } } logger.Warnf("Health check failed, removing from server list. Backend: %q URL: %q Weight: %d Reason: %s", - backend.name, enableURL.String(), weight, err) - if err := backend.LB.RemoveServer(enableURL); err != nil { + backend.name, enabledURL.String(), weight, err) + if err := backend.LB.RemoveServer(enabledURL); err != nil { logger.Error(err) } - backend.disabledURLs = append(backend.disabledURLs, backendURL{enableURL, weight}) + backend.disabledURLs = append(backend.disabledURLs, backendURL{enabledURL, weight}) serverUpMetricValue = 0 } - labelValues := []string{"service", backend.name, "url", enableURL.String()} + labelValues := []string{"service", backend.name, "url", enabledURL.String()} hc.metrics.serverUpGauge.With(labelValues...).Set(serverUpMetricValue) } } @@ -264,11 +273,19 @@ func checkHealth(serverURL *url.URL, backend *BackendConfig) error { return nil } +// StatusUpdater should be implemented by a service that, when its status +// changes (e.g. all if its children are down), needs to propagate upwards (to +// their parent(s)) that change. +type StatusUpdater interface { + RegisterStatusUpdater(fn func(up bool)) error +} + // NewLBStatusUpdater returns a new LbStatusUpdater. -func NewLBStatusUpdater(bh BalancerHandler, info *runtime.ServiceInfo) *LbStatusUpdater { +func NewLBStatusUpdater(bh BalancerHandler, info *runtime.ServiceInfo, hc *dynamic.ServerHealthCheck) *LbStatusUpdater { return &LbStatusUpdater{ - BalancerHandler: bh, - serviceInfo: info, + BalancerHandler: bh, + serviceInfo: info, + wantsHealthCheck: hc != nil, } } @@ -276,27 +293,83 @@ func NewLBStatusUpdater(bh BalancerHandler, info *runtime.ServiceInfo) *LbStatus // so it can keep track of the status of a server in the ServiceInfo. type LbStatusUpdater struct { BalancerHandler - serviceInfo *runtime.ServiceInfo // can be nil + serviceInfo *runtime.ServiceInfo // can be nil + updaters []func(up bool) + wantsHealthCheck bool +} + +// RegisterStatusUpdater adds fn to the list of hooks that are run when the +// status of the Balancer changes. +// Not thread safe. +func (lb *LbStatusUpdater) RegisterStatusUpdater(fn func(up bool)) error { + if !lb.wantsHealthCheck { + return errors.New("healthCheck not enabled in config for this loadbalancer service") + } + + lb.updaters = append(lb.updaters, fn) + return nil } // RemoveServer removes the given server from the BalancerHandler, // and updates the status of the server to "DOWN". func (lb *LbStatusUpdater) RemoveServer(u *url.URL) error { + // TODO(mpl): when we have the freedom to change the signature of RemoveServer + // (kinda stuck because of oxy for now), let's pass around a context to improve + // logging. + ctx := context.TODO() + upBefore := len(lb.BalancerHandler.Servers()) > 0 err := lb.BalancerHandler.RemoveServer(u) - if err == nil && lb.serviceInfo != nil { + if err != nil { + return err + } + if lb.serviceInfo != nil { lb.serviceInfo.UpdateServerStatus(u.String(), serverDown) } - return err + log.FromContext(ctx).Debugf("child %s now %s", u.String(), serverDown) + + if !upBefore { + // we were already down, and we still are, no need to propagate. + log.FromContext(ctx).Debugf("Still %s, no need to propagate", serverDown) + return nil + } + if len(lb.BalancerHandler.Servers()) > 0 { + // we were up, and we still are, no need to propagate + log.FromContext(ctx).Debugf("Still %s, no need to propagate", serverUp) + return nil + } + + log.FromContext(ctx).Debugf("Propagating new %s status", serverDown) + for _, fn := range lb.updaters { + fn(false) + } + return nil } // UpsertServer adds the given server to the BalancerHandler, // and updates the status of the server to "UP". func (lb *LbStatusUpdater) UpsertServer(u *url.URL, options ...roundrobin.ServerOption) error { + ctx := context.TODO() + upBefore := len(lb.BalancerHandler.Servers()) > 0 err := lb.BalancerHandler.UpsertServer(u, options...) - if err == nil && lb.serviceInfo != nil { + if err != nil { + return err + } + if lb.serviceInfo != nil { lb.serviceInfo.UpdateServerStatus(u.String(), serverUp) } - return err + log.FromContext(ctx).Debugf("child %s now %s", u.String(), serverUp) + + if upBefore { + // we were up, and we still are, no need to propagate + log.FromContext(ctx).Debugf("Still %s, no need to propagate", serverUp) + return nil + } + + log.FromContext(ctx).Debugf("Propagating new %s status", serverUp) + for _, fn := range lb.updaters { + fn(true) + } + return nil } // Balancers is a list of Balancers(s) that implements the Balancer interface. diff --git a/pkg/healthcheck/healthcheck_test.go b/pkg/healthcheck/healthcheck_test.go index 594665ae7..88972419e 100644 --- a/pkg/healthcheck/healthcheck_test.go +++ b/pkg/healthcheck/healthcheck_test.go @@ -445,7 +445,7 @@ func (th *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func TestLBStatusUpdater(t *testing.T) { lb := &testLoadBalancer{RWMutex: &sync.RWMutex{}} svInfo := &runtime.ServiceInfo{} - lbsu := NewLBStatusUpdater(lb, svInfo) + lbsu := NewLBStatusUpdater(lb, svInfo, nil) newServer, err := url.Parse("http://foo.com") assert.NoError(t, err) err = lbsu.UpsertServer(newServer, roundrobin.Weight(1)) diff --git a/pkg/middlewares/emptybackendhandler/empty_backend_handler.go b/pkg/middlewares/emptybackendhandler/empty_backend_handler.go index 1b503fc5c..3331bf3e3 100644 --- a/pkg/middlewares/emptybackendhandler/empty_backend_handler.go +++ b/pkg/middlewares/emptybackendhandler/empty_backend_handler.go @@ -10,24 +10,25 @@ import ( // has at least one active Server in respect to the healthchecks and if this // is not the case, it will stop the middleware chain and respond with 503. type emptyBackend struct { - next healthcheck.BalancerHandler + healthcheck.BalancerStatusHandler } // New creates a new EmptyBackend middleware. -func New(lb healthcheck.BalancerHandler) http.Handler { - return &emptyBackend{next: lb} +func New(lb healthcheck.BalancerStatusHandler) http.Handler { + return &emptyBackend{BalancerStatusHandler: lb} } // ServeHTTP responds with 503 when there is no active Server and otherwise // invokes the next handler in the middleware chain. func (e *emptyBackend) ServeHTTP(rw http.ResponseWriter, req *http.Request) { - if len(e.next.Servers()) == 0 { - rw.WriteHeader(http.StatusServiceUnavailable) - _, err := rw.Write([]byte(http.StatusText(http.StatusServiceUnavailable))) - if err != nil { - http.Error(rw, err.Error(), http.StatusInternalServerError) - } - } else { - e.next.ServeHTTP(rw, req) + if len(e.BalancerStatusHandler.Servers()) != 0 { + e.BalancerStatusHandler.ServeHTTP(rw, req) + return + } + + rw.WriteHeader(http.StatusServiceUnavailable) + if _, err := rw.Write([]byte(http.StatusText(http.StatusServiceUnavailable))); err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return } } diff --git a/pkg/middlewares/emptybackendhandler/empty_backend_handler_test.go b/pkg/middlewares/emptybackendhandler/empty_backend_handler_test.go index 26a0a9990..b0d9714d2 100644 --- a/pkg/middlewares/emptybackendhandler/empty_backend_handler_test.go +++ b/pkg/middlewares/emptybackendhandler/empty_backend_handler_test.go @@ -48,6 +48,10 @@ type healthCheckLoadBalancer struct { amountServer int } +func (lb *healthCheckLoadBalancer) RegisterStatusUpdater(fn func(up bool)) error { + return nil +} + func (lb *healthCheckLoadBalancer) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } diff --git a/pkg/provider/kv/kv_test.go b/pkg/provider/kv/kv_test.go index 24dc3b86b..38896001a 100644 --- a/pkg/provider/kv/kv_test.go +++ b/pkg/provider/kv/kv_test.go @@ -628,7 +628,7 @@ func Test_buildConfiguration(t *testing.T) { Scheme: "http", }, }, - HealthCheck: &dynamic.HealthCheck{ + HealthCheck: &dynamic.ServerHealthCheck{ Scheme: "foobar", Path: "foobar", Port: 42, diff --git a/pkg/server/router/router_test.go b/pkg/server/router/router_test.go index ec38f1f74..d6659fb02 100644 --- a/pkg/server/router/router_test.go +++ b/pkg/server/router/router_test.go @@ -470,7 +470,7 @@ func TestRuntimeConfiguration(t *testing.T) { URL: "http://127.0.0.1:8086", }, }, - HealthCheck: &dynamic.HealthCheck{ + HealthCheck: &dynamic.ServerHealthCheck{ Interval: "500ms", Path: "/health", }, diff --git a/pkg/server/server_entrypoint_tcp_test.go b/pkg/server/server_entrypoint_tcp_test.go index 44b471b2f..6b461632c 100644 --- a/pkg/server/server_entrypoint_tcp_test.go +++ b/pkg/server/server_entrypoint_tcp_test.go @@ -94,8 +94,8 @@ func testShutdown(t *testing.T, router *tcp.Router) { // We need to do a write on the conn before the shutdown to make it "exist". // Because the connection indeed exists as far as TCP is concerned, - // but since we only pass it along to the HTTP server after at least one byte is peaked, - // the HTTP server (and hence its shutdown) does not know about the connection until that first byte peaking. + // but since we only pass it along to the HTTP server after at least one byte is peeked, + // the HTTP server (and hence its shutdown) does not know about the connection until that first byte peeked. err = request.Write(conn) require.NoError(t, err) diff --git a/pkg/server/service/loadbalancer/mirror/mirror.go b/pkg/server/service/loadbalancer/mirror/mirror.go index adecf748d..212d88731 100644 --- a/pkg/server/service/loadbalancer/mirror/mirror.go +++ b/pkg/server/service/loadbalancer/mirror/mirror.go @@ -11,6 +11,8 @@ import ( "net/http" "sync" + "github.com/traefik/traefik/v2/pkg/config/dynamic" + "github.com/traefik/traefik/v2/pkg/healthcheck" "github.com/traefik/traefik/v2/pkg/log" "github.com/traefik/traefik/v2/pkg/middlewares/accesslog" "github.com/traefik/traefik/v2/pkg/safe" @@ -23,19 +25,21 @@ type Mirroring struct { rw http.ResponseWriter routinePool *safe.Pool - maxBodySize int64 + maxBodySize int64 + wantsHealthCheck bool lock sync.RWMutex total uint64 } // New returns a new instance of *Mirroring. -func New(handler http.Handler, pool *safe.Pool, maxBodySize int64) *Mirroring { +func New(handler http.Handler, pool *safe.Pool, maxBodySize int64, hc *dynamic.HealthCheck) *Mirroring { return &Mirroring{ - routinePool: pool, - handler: handler, - rw: blackHoleResponseWriter{}, - maxBodySize: maxBodySize, + routinePool: pool, + handler: handler, + rw: blackHoleResponseWriter{}, + maxBodySize: maxBodySize, + wantsHealthCheck: hc != nil, } } @@ -134,6 +138,31 @@ func (m *Mirroring) AddMirror(handler http.Handler, percent int) error { return nil } +// RegisterStatusUpdater adds fn to the list of hooks that are run when the +// status of handler of the Mirroring changes. +// Not thread safe. +func (m *Mirroring) RegisterStatusUpdater(fn func(up bool)) error { + // Since the status propagation is completely transparent through the + // mirroring (because of the recursion on the underlying service), we could maybe + // skip that below, and even not add HealthCheck as a field of + // dynamic.Mirroring. But I think it's easier to understand for the user + // if the HealthCheck is required absolutely everywhere in the config. + if !m.wantsHealthCheck { + return errors.New("healthCheck not enabled in config for this mirroring service") + } + + updater, ok := m.handler.(healthcheck.StatusUpdater) + if !ok { + return fmt.Errorf("service of mirroring %T not a healthcheck.StatusUpdater", m.handler) + } + + if err := updater.RegisterStatusUpdater(fn); err != nil { + return fmt.Errorf("cannot register service of mirroring as updater: %w", err) + } + + return nil +} + type blackHoleResponseWriter struct{} func (b blackHoleResponseWriter) Flush() {} diff --git a/pkg/server/service/loadbalancer/mirror/mirror_test.go b/pkg/server/service/loadbalancer/mirror/mirror_test.go index 7353f8bea..a23b1d9fd 100644 --- a/pkg/server/service/loadbalancer/mirror/mirror_test.go +++ b/pkg/server/service/loadbalancer/mirror/mirror_test.go @@ -21,7 +21,7 @@ func TestMirroringOn100(t *testing.T) { rw.WriteHeader(http.StatusOK) }) pool := safe.NewPool(context.Background()) - mirror := New(handler, pool, defaultMaxBodySize) + mirror := New(handler, pool, defaultMaxBodySize, nil) err := mirror.AddMirror(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { atomic.AddInt32(&countMirror1, 1) }), 10) @@ -50,7 +50,7 @@ func TestMirroringOn10(t *testing.T) { rw.WriteHeader(http.StatusOK) }) pool := safe.NewPool(context.Background()) - mirror := New(handler, pool, defaultMaxBodySize) + mirror := New(handler, pool, defaultMaxBodySize, nil) err := mirror.AddMirror(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { atomic.AddInt32(&countMirror1, 1) }), 10) @@ -74,7 +74,7 @@ func TestMirroringOn10(t *testing.T) { } func TestInvalidPercent(t *testing.T) { - mirror := New(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), safe.NewPool(context.Background()), defaultMaxBodySize) + mirror := New(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), safe.NewPool(context.Background()), defaultMaxBodySize, nil) err := mirror.AddMirror(nil, -1) assert.Error(t, err) @@ -93,7 +93,7 @@ func TestHijack(t *testing.T) { rw.WriteHeader(http.StatusOK) }) pool := safe.NewPool(context.Background()) - mirror := New(handler, pool, defaultMaxBodySize) + mirror := New(handler, pool, defaultMaxBodySize, nil) var mirrorRequest bool err := mirror.AddMirror(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { @@ -117,7 +117,7 @@ func TestFlush(t *testing.T) { rw.WriteHeader(http.StatusOK) }) pool := safe.NewPool(context.Background()) - mirror := New(handler, pool, defaultMaxBodySize) + mirror := New(handler, pool, defaultMaxBodySize, nil) var mirrorRequest bool err := mirror.AddMirror(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { @@ -154,7 +154,7 @@ func TestMirroringWithBody(t *testing.T) { rw.WriteHeader(http.StatusOK) }) - mirror := New(handler, pool, defaultMaxBodySize) + mirror := New(handler, pool, defaultMaxBodySize, nil) for i := 0; i < numMirrors; i++ { err := mirror.AddMirror(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { diff --git a/pkg/server/service/loadbalancer/wrr/wrr.go b/pkg/server/service/loadbalancer/wrr/wrr.go index bef0042f9..c9fc0c7e4 100644 --- a/pkg/server/service/loadbalancer/wrr/wrr.go +++ b/pkg/server/service/loadbalancer/wrr/wrr.go @@ -2,6 +2,7 @@ package wrr import ( "container/heap" + "context" "errors" "fmt" "net/http" @@ -30,16 +31,28 @@ type stickyCookie struct { // Entries have deadlines set at currentDeadline + 1 / weight, // providing weighted round robin behavior with floating point weights and an O(log n) pick time. type Balancer struct { - stickyCookie *stickyCookie + stickyCookie *stickyCookie + wantsHealthCheck bool mutex sync.RWMutex handlers []*namedHandler curDeadline float64 + // status is a record of which child services of the Balancer are healthy, keyed + // by name of child service. A service is initially added to the map when it is + // created via AddService, and it is later removed or added to the map as needed, + // through the SetStatus method. + status map[string]struct{} + // updaters is the list of hooks that are run (to update the Balancer + // parent(s)), whenever the Balancer status changes. + updaters []func(bool) } // New creates a new load balancer. -func New(sticky *dynamic.Sticky) *Balancer { - balancer := &Balancer{} +func New(sticky *dynamic.Sticky, hc *dynamic.HealthCheck) *Balancer { + balancer := &Balancer{ + status: make(map[string]struct{}), + wantsHealthCheck: hc != nil, + } if sticky != nil && sticky.Cookie != nil { balancer.stickyCookie = &stickyCookie{ name: sticky.Cookie.Name, @@ -81,6 +94,58 @@ func (b *Balancer) Pop() interface{} { return h } +// SetStatus sets on the balancer that its given child is now of the given +// status. balancerName is only needed for logging purposes. +func (b *Balancer) SetStatus(ctx context.Context, childName string, up bool) { + b.mutex.Lock() + defer b.mutex.Unlock() + + upBefore := len(b.status) > 0 + + status := "DOWN" + if up { + status = "UP" + } + log.FromContext(ctx).Debugf("Setting status of %s to %v", childName, status) + if up { + b.status[childName] = struct{}{} + } else { + delete(b.status, childName) + } + + upAfter := len(b.status) > 0 + status = "DOWN" + if upAfter { + status = "UP" + } + + // No Status Change + if upBefore == upAfter { + // We're still with the same status, no need to propagate + log.FromContext(ctx).Debugf("Still %s, no need to propagate", status) + return + } + + // Status Change + log.FromContext(ctx).Debugf("Propagating new %s status", status) + for _, fn := range b.updaters { + fn(upAfter) + } +} + +// RegisterStatusUpdater adds fn to the list of hooks that are run when the +// status of the Balancer changes. +// Not thread safe. +func (b *Balancer) RegisterStatusUpdater(fn func(up bool)) error { + if !b.wantsHealthCheck { + return errors.New("healthCheck not enabled in config for this weighted service") + } + b.updaters = append(b.updaters, fn) + return nil +} + +var errNoAvailableServer = errors.New("no available server") + func (b *Balancer) nextServer() (*namedHandler, error) { b.mutex.Lock() defer b.mutex.Unlock() @@ -88,15 +153,24 @@ func (b *Balancer) nextServer() (*namedHandler, error) { if len(b.handlers) == 0 { return nil, fmt.Errorf("no servers in the pool") } + if len(b.status) == 0 { + return nil, errNoAvailableServer + } - // Pick handler with closest deadline. - handler := heap.Pop(b).(*namedHandler) + var handler *namedHandler + for { + // Pick handler with closest deadline. + handler = heap.Pop(b).(*namedHandler) - // curDeadline should be handler's deadline so that new added entry would have a fair competition environment with the old ones. - b.curDeadline = handler.deadline - handler.deadline += 1 / handler.weight + // curDeadline should be handler's deadline so that new added entry would have a fair competition environment with the old ones. + b.curDeadline = handler.deadline + handler.deadline += 1 / handler.weight - heap.Push(b, handler) + heap.Push(b, handler) + if _, ok := b.status[handler.name]; ok { + break + } + } log.WithoutContext().Debugf("Service selected by WRR: %s", handler.name) return handler, nil @@ -112,17 +186,32 @@ func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) { if err == nil && cookie != nil { for _, handler := range b.handlers { - if handler.name == cookie.Value { - handler.ServeHTTP(w, req) - return + if handler.name != cookie.Value { + continue } + + b.mutex.RLock() + _, ok := b.status[handler.name] + b.mutex.RUnlock() + if !ok { + // because we already are in the only iteration that matches the cookie, so none + // of the following iterations are going to be a match for the cookie anyway. + break + } + + handler.ServeHTTP(w, req) + return } } } server, err := b.nextServer() if err != nil { - http.Error(w, http.StatusText(http.StatusInternalServerError)+err.Error(), http.StatusInternalServerError) + if errors.Is(err, errNoAvailableServer) { + http.Error(w, errNoAvailableServer.Error(), http.StatusServiceUnavailable) + } else { + http.Error(w, err.Error(), http.StatusInternalServerError) + } return } @@ -135,7 +224,6 @@ func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) { } // AddService adds a handler. -// It is not thread safe with ServeHTTP. // A handler with a non-positive weight is ignored. func (b *Balancer) AddService(name string, handler http.Handler, weight *int) { w := 1 @@ -148,10 +236,9 @@ func (b *Balancer) AddService(name string, handler http.Handler, weight *int) { h := &namedHandler{Handler: handler, name: name, weight: float64(w)} - // use RWLock to protect b.curDeadline - b.mutex.RLock() + b.mutex.Lock() h.deadline = b.curDeadline + 1/h.weight - b.mutex.RUnlock() - heap.Push(b, h) + b.status[name] = struct{}{} + b.mutex.Unlock() } diff --git a/pkg/server/service/loadbalancer/wrr/wrr_test.go b/pkg/server/service/loadbalancer/wrr/wrr_test.go index 1a8d85da3..ac5f7e15b 100644 --- a/pkg/server/service/loadbalancer/wrr/wrr_test.go +++ b/pkg/server/service/loadbalancer/wrr/wrr_test.go @@ -1,6 +1,7 @@ package wrr import ( + "context" "net/http" "net/http/httptest" "testing" @@ -15,16 +16,18 @@ type responseRecorder struct { *httptest.ResponseRecorder save map[string]int sequence []string + status []int } func (r *responseRecorder) WriteHeader(statusCode int) { r.save[r.Header().Get("server")]++ r.sequence = append(r.sequence, r.Header().Get("server")) + r.status = append(r.status, statusCode) r.ResponseRecorder.WriteHeader(statusCode) } func TestBalancer(t *testing.T) { - balancer := New(nil) + balancer := New(nil, nil) balancer.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") @@ -46,7 +49,7 @@ func TestBalancer(t *testing.T) { } func TestBalancerNoService(t *testing.T) { - balancer := New(nil) + balancer := New(nil, nil) recorder := httptest.NewRecorder() balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) @@ -55,7 +58,7 @@ func TestBalancerNoService(t *testing.T) { } func TestBalancerOneServerZeroWeight(t *testing.T) { - balancer := New(nil) + balancer := New(nil, nil) balancer.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") @@ -72,8 +75,155 @@ func TestBalancerOneServerZeroWeight(t *testing.T) { assert.Equal(t, 3, recorder.save["first"]) } +type key string + +const serviceName key = "serviceName" + +func TestBalancerNoServiceUp(t *testing.T) { + balancer := New(nil, nil) + + balancer.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.WriteHeader(http.StatusInternalServerError) + }), Int(1)) + + balancer.AddService("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.WriteHeader(http.StatusInternalServerError) + }), Int(1)) + + balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "first", false) + balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", false) + + recorder := httptest.NewRecorder() + balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + + assert.Equal(t, http.StatusServiceUnavailable, recorder.Result().StatusCode) +} + +func TestBalancerOneServerDown(t *testing.T) { + balancer := New(nil, nil) + + balancer.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "first") + rw.WriteHeader(http.StatusOK) + }), Int(1)) + + balancer.AddService("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.WriteHeader(http.StatusInternalServerError) + }), Int(1)) + balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", false) + + recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + for i := 0; i < 3; i++ { + balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + } + + assert.Equal(t, 3, recorder.save["first"]) +} + +func TestBalancerDownThenUp(t *testing.T) { + balancer := New(nil, nil) + + balancer.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "first") + rw.WriteHeader(http.StatusOK) + }), Int(1)) + + balancer.AddService("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "second") + rw.WriteHeader(http.StatusOK) + }), Int(1)) + balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", false) + + recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + for i := 0; i < 3; i++ { + balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + } + assert.Equal(t, 3, recorder.save["first"]) + + balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", true) + recorder = &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + for i := 0; i < 2; i++ { + balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + } + assert.Equal(t, 1, recorder.save["first"]) + assert.Equal(t, 1, recorder.save["second"]) +} + +func TestBalancerPropagate(t *testing.T) { + balancer1 := New(nil, &dynamic.HealthCheck{}) + + balancer1.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "first") + rw.WriteHeader(http.StatusOK) + }), Int(1)) + balancer1.AddService("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "second") + rw.WriteHeader(http.StatusOK) + }), Int(1)) + + balancer2 := New(nil, &dynamic.HealthCheck{}) + balancer2.AddService("third", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "third") + rw.WriteHeader(http.StatusOK) + }), Int(1)) + balancer2.AddService("fourth", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "fourth") + rw.WriteHeader(http.StatusOK) + }), Int(1)) + + topBalancer := New(nil, &dynamic.HealthCheck{}) + topBalancer.AddService("balancer1", balancer1, Int(1)) + _ = balancer1.RegisterStatusUpdater(func(up bool) { + topBalancer.SetStatus(context.WithValue(context.Background(), serviceName, "top"), "balancer1", up) + // TODO(mpl): if test gets flaky, add channel or something here to signal that + // propagation is done, and wait on it before sending request. + }) + topBalancer.AddService("balancer2", balancer2, Int(1)) + _ = balancer2.RegisterStatusUpdater(func(up bool) { + topBalancer.SetStatus(context.WithValue(context.Background(), serviceName, "top"), "balancer2", up) + }) + + recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + for i := 0; i < 8; i++ { + topBalancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + } + assert.Equal(t, 2, recorder.save["first"]) + assert.Equal(t, 2, recorder.save["second"]) + assert.Equal(t, 2, recorder.save["third"]) + assert.Equal(t, 2, recorder.save["fourth"]) + wantStatus := []int{200, 200, 200, 200, 200, 200, 200, 200} + assert.Equal(t, wantStatus, recorder.status) + + // fourth gets downed, but balancer2 still up since third is still up. + balancer2.SetStatus(context.WithValue(context.Background(), serviceName, "top"), "fourth", false) + recorder = &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + for i := 0; i < 8; i++ { + topBalancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + } + assert.Equal(t, 2, recorder.save["first"]) + assert.Equal(t, 2, recorder.save["second"]) + assert.Equal(t, 4, recorder.save["third"]) + assert.Equal(t, 0, recorder.save["fourth"]) + wantStatus = []int{200, 200, 200, 200, 200, 200, 200, 200} + assert.Equal(t, wantStatus, recorder.status) + + // third gets downed, and the propagation triggers balancer2 to be marked as + // down as well for topBalancer. + balancer2.SetStatus(context.WithValue(context.Background(), serviceName, "top"), "third", false) + recorder = &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + for i := 0; i < 8; i++ { + topBalancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + } + assert.Equal(t, 4, recorder.save["first"]) + assert.Equal(t, 4, recorder.save["second"]) + assert.Equal(t, 0, recorder.save["third"]) + assert.Equal(t, 0, recorder.save["fourth"]) + wantStatus = []int{200, 200, 200, 200, 200, 200, 200, 200} + assert.Equal(t, wantStatus, recorder.status) +} + func TestBalancerAllServersZeroWeight(t *testing.T) { - balancer := New(nil) + balancer := New(nil, nil) balancer.AddService("test", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), Int(0)) balancer.AddService("test2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), Int(0)) @@ -87,7 +237,7 @@ func TestBalancerAllServersZeroWeight(t *testing.T) { func TestSticky(t *testing.T) { balancer := New(&dynamic.Sticky{ Cookie: &dynamic.Cookie{Name: "test"}, - }) + }, nil) balancer.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") @@ -118,7 +268,7 @@ func TestSticky(t *testing.T) { // TestBalancerBias makes sure that the WRR algorithm spreads elements evenly right from the start, // and that it does not "over-favor" the high-weighted ones with a biased start-up regime. func TestBalancerBias(t *testing.T) { - balancer := New(nil) + balancer := New(nil, nil) balancer.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "A") diff --git a/pkg/server/service/service.go b/pkg/server/service/service.go index c9e238b3f..93e6f23b5 100644 --- a/pkg/server/service/service.go +++ b/pkg/server/service/service.go @@ -135,7 +135,7 @@ func (m *Manager) getMirrorServiceHandler(ctx context.Context, config *dynamic.M if config.MaxBodySize != nil { maxBodySize = *config.MaxBodySize } - handler := mirror.New(serviceHandler, m.routinePool, maxBodySize) + handler := mirror.New(serviceHandler, m.routinePool, maxBodySize, config.HealthCheck) for _, mirrorConfig := range config.Mirrors { mirrorHandler, err := m.BuildHTTP(ctx, mirrorConfig.Name) if err != nil { @@ -156,7 +156,7 @@ func (m *Manager) getWRRServiceHandler(ctx context.Context, serviceName string, config.Sticky.Cookie.Name = cookie.GetName(config.Sticky.Cookie.Name, serviceName) } - balancer := wrr.New(config.Sticky) + balancer := wrr.New(config.Sticky, config.HealthCheck) for _, service := range config.Services { serviceHandler, err := m.BuildHTTP(ctx, service.Name) if err != nil { @@ -164,7 +164,25 @@ func (m *Manager) getWRRServiceHandler(ctx context.Context, serviceName string, } balancer.AddService(service.Name, serviceHandler, service.Weight) + if config.HealthCheck == nil { + continue + } + + childName := service.Name + updater, ok := serviceHandler.(healthcheck.StatusUpdater) + if !ok { + return nil, fmt.Errorf("child service %v of %v not a healthcheck.StatusUpdater (%T)", childName, serviceName, serviceHandler) + } + + if err := updater.RegisterStatusUpdater(func(up bool) { + balancer.SetStatus(ctx, childName, up) + }); err != nil { + return nil, fmt.Errorf("cannot register %v as updater for %v: %w", childName, serviceName, err) + } + + log.FromContext(ctx).Debugf("Child service %v will update parent %v on status change", childName, serviceName) } + return balancer, nil } @@ -213,34 +231,30 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName return emptybackendhandler.New(balancer), nil } -// LaunchHealthCheck Launches the health checks. +// LaunchHealthCheck launches the health checks. func (m *Manager) LaunchHealthCheck() { backendConfigs := make(map[string]*healthcheck.BackendConfig) for serviceName, balancers := range m.balancers { ctx := log.With(context.Background(), log.Str(log.ServiceName, serviceName)) - // TODO Should all the services handle healthcheck? Handle different types service := m.configs[serviceName].LoadBalancer // Health Check - var backendHealthCheck *healthcheck.BackendConfig - if hcOpts := buildHealthCheckOptions(ctx, balancers, serviceName, service.HealthCheck); hcOpts != nil { - log.FromContext(ctx).Debugf("Setting up healthcheck for service %s with %s", serviceName, *hcOpts) - - hcOpts.Transport, _ = m.roundTripperManager.Get(service.ServersTransport) - backendHealthCheck = healthcheck.NewBackendConfig(*hcOpts, serviceName) + hcOpts := buildHealthCheckOptions(ctx, balancers, serviceName, service.HealthCheck) + if hcOpts == nil { + continue } + hcOpts.Transport, _ = m.roundTripperManager.Get(service.ServersTransport) + log.FromContext(ctx).Debugf("Setting up healthcheck for service %s with %s", serviceName, *hcOpts) - if backendHealthCheck != nil { - backendConfigs[serviceName] = backendHealthCheck - } + backendConfigs[serviceName] = healthcheck.NewBackendConfig(*hcOpts, serviceName) } healthcheck.GetHealthCheck(m.metricsRegistry).SetBackendsConfiguration(context.Background(), backendConfigs) } -func buildHealthCheckOptions(ctx context.Context, lb healthcheck.Balancer, backend string, hc *dynamic.HealthCheck) *healthcheck.Options { +func buildHealthCheckOptions(ctx context.Context, lb healthcheck.Balancer, backend string, hc *dynamic.ServerHealthCheck) *healthcheck.Options { if hc == nil || hc.Path == "" { return nil } @@ -295,7 +309,7 @@ func buildHealthCheckOptions(ctx context.Context, lb healthcheck.Balancer, backe } } -func (m *Manager) getLoadBalancer(ctx context.Context, serviceName string, service *dynamic.ServersLoadBalancer, fwd http.Handler) (healthcheck.BalancerHandler, error) { +func (m *Manager) getLoadBalancer(ctx context.Context, serviceName string, service *dynamic.ServersLoadBalancer, fwd http.Handler) (healthcheck.BalancerStatusHandler, error) { logger := log.FromContext(ctx) logger.Debug("Creating load-balancer") @@ -327,7 +341,7 @@ func (m *Manager) getLoadBalancer(ctx context.Context, serviceName string, servi return nil, err } - lbsu := healthcheck.NewLBStatusUpdater(lb, m.configs[serviceName]) + lbsu := healthcheck.NewLBStatusUpdater(lb, m.configs[serviceName], service.HealthCheck) if err := m.upsertServers(ctx, lbsu, service.Servers); err != nil { return nil, fmt.Errorf("error configuring load balancer for service %s: %w", serviceName, err) } diff --git a/pkg/server/service/service_test.go b/pkg/server/service/service_test.go index 49fd7a1be..f479263ad 100644 --- a/pkg/server/service/service_test.go +++ b/pkg/server/service/service_test.go @@ -241,7 +241,7 @@ func TestGetLoadBalancerServiceHandler(t *testing.T) { }, }, { - desc: "PassHost doesn't passe the host instead of the IP", + desc: "PassHost doesn't pass the host instead of the IP", serviceName: "test", service: &dynamic.ServersLoadBalancer{ PassHostHeader: Bool(false), @@ -406,5 +406,3 @@ func TestMultipleTypeOnBuildHTTP(t *testing.T) { _, err := manager.BuildHTTP(context.Background(), "test@file") assert.Error(t, err, "cannot create service: multi-types service not supported, consider declaring two different pieces of service instead") } - -// FIXME Add healthcheck tests