Healthcheck managed for all related services
Co-authored-by: Mathieu Lonjaret <mathieu.lonjaret@gmail.com>
This commit is contained in:
parent
a87c104172
commit
efcc9d51d4
4 changed files with 173 additions and 25 deletions
|
@ -0,0 +1,40 @@
|
||||||
|
[global]
|
||||||
|
checkNewVersion = false
|
||||||
|
sendAnonymousUsage = false
|
||||||
|
|
||||||
|
[log]
|
||||||
|
level = "DEBUG"
|
||||||
|
|
||||||
|
[entryPoints]
|
||||||
|
[entryPoints.web1]
|
||||||
|
address = ":8000"
|
||||||
|
[entryPoints.web2]
|
||||||
|
address = ":9000"
|
||||||
|
|
||||||
|
[api]
|
||||||
|
insecure = true
|
||||||
|
|
||||||
|
[providers.file]
|
||||||
|
filename = "{{ .SelfFilename }}"
|
||||||
|
|
||||||
|
## dynamic configuration ##
|
||||||
|
|
||||||
|
[http.routers]
|
||||||
|
[http.routers.router1]
|
||||||
|
entryPoints = ["web1"]
|
||||||
|
service = "service1"
|
||||||
|
rule = "Host(`test.localhost`)"
|
||||||
|
|
||||||
|
[http.routers.router2]
|
||||||
|
entryPoints = ["web2"]
|
||||||
|
service = "service1"
|
||||||
|
rule = "Host(`test.localhost`)"
|
||||||
|
|
||||||
|
[http.services]
|
||||||
|
[http.services.service1.loadBalancer]
|
||||||
|
[http.services.service1.loadBalancer.healthcheck]
|
||||||
|
path = "/health"
|
||||||
|
interval = "1s"
|
||||||
|
timeout = "0.9s"
|
||||||
|
[[http.services.service1.loadBalancer.servers]]
|
||||||
|
url = "http://{{.Server1}}:80"
|
|
@ -205,3 +205,69 @@ func (s *HealthCheckSuite) TestPortOverload(c *check.C) {
|
||||||
err = try.Request(frontendHealthReq, 3*time.Second, try.StatusCodeIs(http.StatusServiceUnavailable))
|
err = try.Request(frontendHealthReq, 3*time.Second, try.StatusCodeIs(http.StatusServiceUnavailable))
|
||||||
c.Assert(err, checker.IsNil)
|
c.Assert(err, checker.IsNil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Checks if all the loadbalancers created will correctly update the server status
|
||||||
|
func (s *HealthCheckSuite) TestMultipleRoutersOnSameService(c *check.C) {
|
||||||
|
file := s.adaptFile(c, "fixtures/healthcheck/multiple-routers-one-same-service.toml", struct {
|
||||||
|
Server1 string
|
||||||
|
}{s.whoami1IP})
|
||||||
|
defer os.Remove(file)
|
||||||
|
|
||||||
|
cmd, display := s.traefikCmd(withConfigFile(file))
|
||||||
|
defer display(c)
|
||||||
|
err := cmd.Start()
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
defer cmd.Process.Kill()
|
||||||
|
|
||||||
|
// wait for traefik
|
||||||
|
err = try.GetRequest("http://127.0.0.1:8080/api/rawdata", 60*time.Second, try.BodyContains("Host(`test.localhost`)"))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// Set whoami health to 200 to be sure to start with the wanted status
|
||||||
|
client := &http.Client{}
|
||||||
|
statusOkReq, err := http.NewRequest(http.MethodPost, "http://"+s.whoami1IP+"/health", bytes.NewBuffer([]byte("200")))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
_, err = client.Do(statusOkReq)
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// check healthcheck on web1 entrypoint
|
||||||
|
healthReqWeb1, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000/health", nil)
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
healthReqWeb1.Host = "test.localhost"
|
||||||
|
err = try.Request(healthReqWeb1, 1*time.Second, try.StatusCodeIs(http.StatusOK))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// check healthcheck on web2 entrypoint
|
||||||
|
healthReqWeb2, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:9000/health", nil)
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
healthReqWeb2.Host = "test.localhost"
|
||||||
|
|
||||||
|
err = try.Request(healthReqWeb2, 500*time.Millisecond, try.StatusCodeIs(http.StatusOK))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// Set whoami health to 500
|
||||||
|
statusInternalServerErrorReq, err := http.NewRequest(http.MethodPost, "http://"+s.whoami1IP+"/health", bytes.NewBuffer([]byte("500")))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
_, err = client.Do(statusInternalServerErrorReq)
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// Verify no backend service is available due to failing health checks
|
||||||
|
err = try.Request(healthReqWeb1, 3*time.Second, try.StatusCodeIs(http.StatusServiceUnavailable))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
err = try.Request(healthReqWeb2, 3*time.Second, try.StatusCodeIs(http.StatusServiceUnavailable))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// Change one whoami health to 200
|
||||||
|
statusOKReq1, err := http.NewRequest(http.MethodPost, "http://"+s.whoami1IP+"/health", bytes.NewBuffer([]byte("200")))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
_, err = client.Do(statusOKReq1)
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// Verify health check
|
||||||
|
err = try.Request(healthReqWeb1, 3*time.Second, try.StatusCodeIs(http.StatusOK))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
err = try.Request(healthReqWeb2, 3*time.Second, try.StatusCodeIs(http.StatusOK))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
}
|
||||||
|
|
|
@ -25,14 +25,20 @@ const (
|
||||||
var singleton *HealthCheck
|
var singleton *HealthCheck
|
||||||
var once sync.Once
|
var once sync.Once
|
||||||
|
|
||||||
// BalancerHandler includes functionality for load-balancing management.
|
// Balancer is the set of operations required to manage the list of servers in a
|
||||||
type BalancerHandler interface {
|
// load-balancer.
|
||||||
ServeHTTP(w http.ResponseWriter, req *http.Request)
|
type Balancer interface {
|
||||||
Servers() []*url.URL
|
Servers() []*url.URL
|
||||||
RemoveServer(u *url.URL) error
|
RemoveServer(u *url.URL) error
|
||||||
UpsertServer(u *url.URL, options ...roundrobin.ServerOption) error
|
UpsertServer(u *url.URL, options ...roundrobin.ServerOption) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BalancerHandler includes functionality for load-balancing management.
|
||||||
|
type BalancerHandler interface {
|
||||||
|
ServeHTTP(w http.ResponseWriter, req *http.Request)
|
||||||
|
Balancer
|
||||||
|
}
|
||||||
|
|
||||||
// metricsRegistry is a local interface in the health check package, exposing only the required metrics
|
// metricsRegistry is a local interface in the health check package, exposing only the required metrics
|
||||||
// necessary for the health check package. This makes it easier for the tests.
|
// necessary for the health check package. This makes it easier for the tests.
|
||||||
type metricsRegistry interface {
|
type metricsRegistry interface {
|
||||||
|
@ -49,7 +55,7 @@ type Options struct {
|
||||||
Transport http.RoundTripper
|
Transport http.RoundTripper
|
||||||
Interval time.Duration
|
Interval time.Duration
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
LB BalancerHandler
|
LB Balancer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (opt Options) String() string {
|
func (opt Options) String() string {
|
||||||
|
@ -146,18 +152,18 @@ func (hc *HealthCheck) checkBackend(ctx context.Context, backend *BackendConfig)
|
||||||
enabledURLs := backend.LB.Servers()
|
enabledURLs := backend.LB.Servers()
|
||||||
var newDisabledURLs []backendURL
|
var newDisabledURLs []backendURL
|
||||||
// FIXME re enable metrics
|
// FIXME re enable metrics
|
||||||
for _, disableURL := range backend.disabledURLs {
|
for _, disabledURL := range backend.disabledURLs {
|
||||||
// FIXME serverUpMetricValue := float64(0)
|
// FIXME serverUpMetricValue := float64(0)
|
||||||
if err := checkHealth(disableURL.url, backend); err == nil {
|
if err := checkHealth(disabledURL.url, backend); err == nil {
|
||||||
logger.Warnf("Health check up: Returning to server list. Backend: %q URL: %q Weight: %d",
|
logger.Warnf("Health check up: Returning to server list. Backend: %q URL: %q Weight: %d",
|
||||||
backend.name, disableURL.url.String(), disableURL.weight)
|
backend.name, disabledURL.url.String(), disabledURL.weight)
|
||||||
if err = backend.LB.UpsertServer(disableURL.url, roundrobin.Weight(disableURL.weight)); err != nil {
|
if err = backend.LB.UpsertServer(disabledURL.url, roundrobin.Weight(disabledURL.weight)); err != nil {
|
||||||
logger.Error(err)
|
logger.Error(err)
|
||||||
}
|
}
|
||||||
// FIXME serverUpMetricValue = 1
|
// FIXME serverUpMetricValue = 1
|
||||||
} else {
|
} else {
|
||||||
logger.Warnf("Health check still failing. Backend: %q URL: %q Reason: %s", backend.name, disableURL.url.String(), err)
|
logger.Warnf("Health check still failing. Backend: %q URL: %q Reason: %s", backend.name, disabledURL.url.String(), err)
|
||||||
newDisabledURLs = append(newDisabledURLs, disableURL)
|
newDisabledURLs = append(newDisabledURLs, disabledURL)
|
||||||
}
|
}
|
||||||
// FIXME labelValues := []string{"backend", backend.name, "url", backendurl.url.String()}
|
// FIXME labelValues := []string{"backend", backend.name, "url", backendurl.url.String()}
|
||||||
// FIXME hc.metrics.BackendServerUpGauge().With(labelValues...).Set(serverUpMetricValue)
|
// FIXME hc.metrics.BackendServerUpGauge().With(labelValues...).Set(serverUpMetricValue)
|
||||||
|
@ -177,7 +183,7 @@ func (hc *HealthCheck) checkBackend(ctx context.Context, backend *BackendConfig)
|
||||||
weight = 1
|
weight = 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.Warnf("Health check failed: Remove from server list. Backend: %q URL: %q Weight: %d Reason: %s", backend.name, enableURL.String(), weight, err)
|
logger.Warnf("Health check failed, removing from server list. Backend: %q URL: %q Weight: %d Reason: %s", backend.name, enableURL.String(), weight, err)
|
||||||
if err := backend.LB.RemoveServer(enableURL); err != nil {
|
if err := backend.LB.RemoveServer(enableURL); err != nil {
|
||||||
logger.Error(err)
|
logger.Error(err)
|
||||||
}
|
}
|
||||||
|
@ -281,3 +287,38 @@ func (lb *LbStatusUpdater) UpsertServer(u *url.URL, options ...roundrobin.Server
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Balancers is a list of Balancers(s) that implements the Balancer interface.
|
||||||
|
type Balancers []Balancer
|
||||||
|
|
||||||
|
// Servers returns the servers url from all the BalancerHandler
|
||||||
|
func (b Balancers) Servers() []*url.URL {
|
||||||
|
var servers []*url.URL
|
||||||
|
for _, lb := range b {
|
||||||
|
servers = append(servers, lb.Servers()...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return servers
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveServer removes the given server from all the BalancerHandler,
|
||||||
|
// and updates the status of the server to "DOWN".
|
||||||
|
func (b Balancers) RemoveServer(u *url.URL) error {
|
||||||
|
for _, lb := range b {
|
||||||
|
if err := lb.RemoveServer(u); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpsertServer adds the given server to all the BalancerHandler,
|
||||||
|
// and updates the status of the server to "UP".
|
||||||
|
func (b Balancers) UpsertServer(u *url.URL, options ...roundrobin.ServerOption) error {
|
||||||
|
for _, lb := range b {
|
||||||
|
if err := lb.UpsertServer(u, options...); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ func NewManager(configs map[string]*runtime.ServiceInfo, defaultRoundTripper htt
|
||||||
metricsRegistry: metricsRegistry,
|
metricsRegistry: metricsRegistry,
|
||||||
bufferPool: newBufferPool(),
|
bufferPool: newBufferPool(),
|
||||||
defaultRoundTripper: defaultRoundTripper,
|
defaultRoundTripper: defaultRoundTripper,
|
||||||
balancers: make(map[string][]healthcheck.BalancerHandler),
|
balancers: make(map[string]healthcheck.Balancers),
|
||||||
configs: configs,
|
configs: configs,
|
||||||
api: api,
|
api: api,
|
||||||
rest: rest,
|
rest: rest,
|
||||||
|
@ -53,7 +53,11 @@ type Manager struct {
|
||||||
metricsRegistry metrics.Registry
|
metricsRegistry metrics.Registry
|
||||||
bufferPool httputil.BufferPool
|
bufferPool httputil.BufferPool
|
||||||
defaultRoundTripper http.RoundTripper
|
defaultRoundTripper http.RoundTripper
|
||||||
balancers map[string][]healthcheck.BalancerHandler
|
// balancers is the map of all Balancers, keyed by service name.
|
||||||
|
// There is one Balancer per service handler, and there is one service handler per reference to a service
|
||||||
|
// (e.g. if 2 routers refer to the same service name, 2 service handlers are created),
|
||||||
|
// which is why there is not just one Balancer per service name.
|
||||||
|
balancers map[string]healthcheck.Balancers
|
||||||
configs map[string]*runtime.ServiceInfo
|
configs map[string]*runtime.ServiceInfo
|
||||||
api http.Handler
|
api http.Handler
|
||||||
rest http.Handler
|
rest http.Handler
|
||||||
|
@ -110,14 +114,14 @@ func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string, respons
|
||||||
}
|
}
|
||||||
case conf.Weighted != nil:
|
case conf.Weighted != nil:
|
||||||
var err error
|
var err error
|
||||||
lb, err = m.getLoadBalancerWRRServiceHandler(ctx, serviceName, conf.Weighted, responseModifier)
|
lb, err = m.getWRRServiceHandler(ctx, serviceName, conf.Weighted, responseModifier)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
conf.AddError(err, true)
|
conf.AddError(err, true)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
case conf.Mirroring != nil:
|
case conf.Mirroring != nil:
|
||||||
var err error
|
var err error
|
||||||
lb, err = m.getLoadBalancerMirrorServiceHandler(ctx, serviceName, conf.Mirroring, responseModifier)
|
lb, err = m.getMirrorServiceHandler(ctx, serviceName, conf.Mirroring, responseModifier)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
conf.AddError(err, true)
|
conf.AddError(err, true)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -131,7 +135,7 @@ func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string, respons
|
||||||
return lb, nil
|
return lb, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) getLoadBalancerMirrorServiceHandler(ctx context.Context, serviceName string, config *dynamic.Mirroring, responseModifier func(*http.Response) error) (http.Handler, error) {
|
func (m *Manager) getMirrorServiceHandler(ctx context.Context, serviceName string, config *dynamic.Mirroring, responseModifier func(*http.Response) error) (http.Handler, error) {
|
||||||
serviceHandler, err := m.BuildHTTP(ctx, config.Service, responseModifier)
|
serviceHandler, err := m.BuildHTTP(ctx, config.Service, responseModifier)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -152,7 +156,7 @@ func (m *Manager) getLoadBalancerMirrorServiceHandler(ctx context.Context, servi
|
||||||
return handler, nil
|
return handler, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) getLoadBalancerWRRServiceHandler(ctx context.Context, serviceName string, config *dynamic.WeightedRoundRobin, responseModifier func(*http.Response) error) (http.Handler, error) {
|
func (m *Manager) getWRRServiceHandler(ctx context.Context, serviceName string, config *dynamic.WeightedRoundRobin, responseModifier func(*http.Response) error) (http.Handler, error) {
|
||||||
// TODO Handle accesslog and metrics with multiple service name
|
// TODO Handle accesslog and metrics with multiple service name
|
||||||
if config.Sticky != nil && config.Sticky.Cookie != nil {
|
if config.Sticky != nil && config.Sticky.Cookie != nil {
|
||||||
config.Sticky.Cookie.Name = cookie.GetName(config.Sticky.Cookie.Name, serviceName)
|
config.Sticky.Cookie.Name = cookie.GetName(config.Sticky.Cookie.Name, serviceName)
|
||||||
|
@ -218,15 +222,12 @@ func (m *Manager) LaunchHealthCheck() {
|
||||||
for serviceName, balancers := range m.balancers {
|
for serviceName, balancers := range m.balancers {
|
||||||
ctx := log.With(context.Background(), log.Str(log.ServiceName, serviceName))
|
ctx := log.With(context.Background(), log.Str(log.ServiceName, serviceName))
|
||||||
|
|
||||||
// TODO aggregate
|
|
||||||
balancer := balancers[0]
|
|
||||||
|
|
||||||
// TODO Should all the services handle healthcheck? Handle different types
|
// TODO Should all the services handle healthcheck? Handle different types
|
||||||
service := m.configs[serviceName].LoadBalancer
|
service := m.configs[serviceName].LoadBalancer
|
||||||
|
|
||||||
// Health Check
|
// Health Check
|
||||||
var backendHealthCheck *healthcheck.BackendConfig
|
var backendHealthCheck *healthcheck.BackendConfig
|
||||||
if hcOpts := buildHealthCheckOptions(ctx, balancer, serviceName, service.HealthCheck); hcOpts != nil {
|
if hcOpts := buildHealthCheckOptions(ctx, balancers, serviceName, service.HealthCheck); hcOpts != nil {
|
||||||
log.FromContext(ctx).Debugf("Setting up healthcheck for service %s with %s", serviceName, *hcOpts)
|
log.FromContext(ctx).Debugf("Setting up healthcheck for service %s with %s", serviceName, *hcOpts)
|
||||||
|
|
||||||
hcOpts.Transport = m.defaultRoundTripper
|
hcOpts.Transport = m.defaultRoundTripper
|
||||||
|
@ -242,7 +243,7 @@ func (m *Manager) LaunchHealthCheck() {
|
||||||
healthcheck.GetHealthCheck().SetBackendsConfiguration(context.Background(), backendConfigs)
|
healthcheck.GetHealthCheck().SetBackendsConfiguration(context.Background(), backendConfigs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildHealthCheckOptions(ctx context.Context, lb healthcheck.BalancerHandler, backend string, hc *dynamic.HealthCheck) *healthcheck.Options {
|
func buildHealthCheckOptions(ctx context.Context, lb healthcheck.Balancer, backend string, hc *dynamic.HealthCheck) *healthcheck.Options {
|
||||||
if hc == nil || hc.Path == "" {
|
if hc == nil || hc.Path == "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue