From ea3510d1f33e126addc77b8d0d64e78993469ea3 Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Fri, 18 Aug 2017 03:08:03 +0200 Subject: [PATCH] Add support for readiness checks. --- docs/toml.md | 11 ++ docs/user-guide/marathon.md | 10 +- provider/marathon/builder_test.go | 62 +++++++- provider/marathon/marathon.go | 27 +++- provider/marathon/marathon_test.go | 223 +++++++++++++++------------- provider/marathon/readiness.go | 122 +++++++++++++++ provider/marathon/readiness_test.go | 134 +++++++++++++++++ traefik.sample.toml | 11 ++ 8 files changed, 483 insertions(+), 117 deletions(-) create mode 100644 provider/marathon/readiness.go create mode 100644 provider/marathon/readiness_test.go diff --git a/docs/toml.md b/docs/toml.md index 119fe5f21..205fdba95 100644 --- a/docs/toml.md +++ b/docs/toml.md @@ -1083,6 +1083,17 @@ domain = "marathon.localhost" # Default: false # # forceTaskHostname: false + +# Applications may define readiness checks which are probed by Marathon during +# deployments periodically and the results exposed via the API. Enabling the +# following parameter causes Traefik to filter out tasks whose readiness checks +# have not succeeded. +# Note that the checks are only valid at deployment times. See the Marathon +# guide for details. +# +# Optional +# Default: false +# respectReadinessChecks: false ``` Labels can be used on containers to override default behaviour: diff --git a/docs/user-guide/marathon.md b/docs/user-guide/marathon.md index 827b022ab..2a10922c5 100644 --- a/docs/user-guide/marathon.md +++ b/docs/user-guide/marathon.md @@ -40,11 +40,11 @@ The following sub-sections describe how to resolve or mitigate each scenario. ### Startup -In general, it is possible to define [readiness checks](https://mesosphere.github.io/marathon/docs/readiness-checks.html) (available since Marathon version 1.1) per application and have Marathon take these into account during the startup phase. The idea is that each application provides an HTTP endpoint that Marathon queries periodically during an ongoing deployment in order to mark the associated readiness check result as successful if and only if the endpoint returns a response within the configured HTTP code range. As long as the check keeps failing, Marathon will not proceed with the deployment (within the configured upgrade stategy bounds). +It is possible to define [readiness checks](https://mesosphere.github.io/marathon/docs/readiness-checks.html) (available since Marathon version 1.1) per application and have Marathon take these into account during the startup phase. The idea is that each application provides an HTTP endpoint that Marathon queries periodically during an ongoing deployment in order to mark the associated readiness check result as successful if and only if the endpoint returns a response within the configured HTTP code range. As long as the check keeps failing, Marathon will not proceed with the deployment (within the configured upgrade stategy bounds). -Unfortunately, Traefik does not respect the result of the readiness check yet. Support is expected to land in a not-too-distant future release of Traefik, however, as being tracked by [issue 1559](https://github.com/containous/traefik/issues/1559). +Beginning with version 1.4, Traefik respects readiness check results if the Traefik option is set and checks are configured on the applications accordingly. Note that due to the way readiness check results are currently exposed by the Marathon API, ready tasks may be taken into rotation with a small delay. It is on the order of one readiness check timeout interval (as configured on the application specifiation) and guarantees that non-ready tasks do not receive traffic prematurely. -A current mitigation strategy is to enable [retries](http://docs.traefik.io/toml/#retry-configuration) and make sure that a sufficient number of healthy application tasks exist so that one retry will likely hit one of those. Apart from its probabilistic nature, the workaround comes at the price of increased latency. +If readiness checks are not possible, a current mitigation strategy is to enable [retries](http://docs.traefik.io/toml/#retry-configuration) and make sure that a sufficient number of healthy application tasks exist so that one retry will likely hit one of those. Apart from its probabilistic nature, the workaround comes at the price of increased latency. ### Shutdown @@ -81,11 +81,11 @@ There are a few alternatives of varying quality that are frequently asked for. T It may seem obvious to reuse the Marathon health checks as a signal to Traefik whether an application should be taken into load-balancing rotation or not. -Apart from the increased latency a failing health check may have, a major problem with this is is that Marathon does not persist the health check results. Consequently, if a master re-election occurs in the Marathon clusters, all health check results will revert to the _unknown_ state, effectively causing all applications inside the cluster to become unavailable and leading to a complete cluster failure. Re-elections do not only happen during regular maintenance work (often requiring rolling upgrades of the Marathon nodes) but also when the Marathon leader fails spontaneously). As such, there is no way to handle this situation deterministically. +Apart from the increased latency a failing health check may have, a major problem with this is is that Marathon does not persist the health check results. Consequently, if a master re-election occurs in the Marathon clusters, all health check results will revert to the _unknown_ state, effectively causing all applications inside the cluster to become unavailable and leading to a complete cluster failure. Re-elections do not only happen during regular maintenance work (often requiring rolling upgrades of the Marathon nodes) but also when the Marathon leader fails spontaneously. As such, there is no way to handle this situation deterministically. Finally, Marathon health checks are not mandatory (the default is to use the task state as reported by Mesos), so requiring them for Traefik would raise the entry barrier for Marathon users. -Traefik used to use the health check results but moved away from it as [users reported the dramatic consequences](https://github.com/containous/traefik/issues/653). +Traefik used to use the health check results as a strict requirement but moved away from it as [users reported the dramatic consequences](https://github.com/containous/traefik/issues/653). If health check results are known to exist, however, they will be used to signal task availability. ### Draining diff --git a/provider/marathon/builder_test.go b/provider/marathon/builder_test.go index 5f226a584..aa993415a 100644 --- a/provider/marathon/builder_test.go +++ b/provider/marathon/builder_test.go @@ -1,12 +1,21 @@ package marathon -import "github.com/gambol99/go-marathon" +import ( + "time" + + "github.com/gambol99/go-marathon" +) + +const testTaskName string = "taskID" // Functions related to building applications. -func createApplication(ops ...func(*marathon.Application)) marathon.Application { +func application(ops ...func(*marathon.Application)) marathon.Application { app := marathon.Application{} app.EmptyLabels() + app.Deployments = []map[string]string{} + app.ReadinessChecks = &[]marathon.ReadinessCheck{} + app.ReadinessCheckResults = &[]marathon.ReadinessCheckResult{} for _, op := range ops { op(&app) @@ -63,10 +72,41 @@ func ipAddrPerTask(port int) func(*marathon.Application) { } } +func deployments(ids ...string) func(*marathon.Application) { + return func(app *marathon.Application) { + for _, id := range ids { + app.Deployments = append(app.Deployments, map[string]string{ + "ID": id, + }) + } + } +} + +func readinessCheck(timeout time.Duration) func(*marathon.Application) { + return func(app *marathon.Application) { + app.ReadinessChecks = &[]marathon.ReadinessCheck{ + { + Path: "/ready", + TimeoutSeconds: int(timeout.Seconds()), + }, + } + } +} + +func readinessCheckResult(taskID string, ready bool) func(*marathon.Application) { + return func(app *marathon.Application) { + *app.ReadinessCheckResults = append(*app.ReadinessCheckResults, marathon.ReadinessCheckResult{ + TaskID: taskID, + Ready: ready, + }) + } +} + // Functions related to building tasks. -func createTask(ops ...func(*marathon.Task)) marathon.Task { +func task(ops ...func(*marathon.Task)) marathon.Task { t := marathon.Task{ + ID: testTaskName, // The vast majority of tests expect the task state to be TASK_RUNNING. State: string(taskStateRunning), } @@ -78,8 +118,8 @@ func createTask(ops ...func(*marathon.Task)) marathon.Task { return t } -func createLocalhostTask(ops ...func(*marathon.Task)) marathon.Task { - t := createTask( +func localhostTask(ops ...func(*marathon.Task)) marathon.Task { + t := task( host("localhost"), ipAddresses("127.0.0.1"), ) @@ -129,3 +169,15 @@ func healthCheckResultLiveness(alive ...bool) func(*marathon.Task) { } } } + +func startedAt(timestamp string) func(*marathon.Task) { + return func(t *marathon.Task) { + t.StartedAt = timestamp + } +} + +func startedAtFromNow(offset time.Duration) func(*marathon.Task) { + return func(t *marathon.Task) { + t.StartedAt = time.Now().Add(-offset).Format(time.RFC3339) + } +} diff --git a/provider/marathon/marathon.go b/provider/marathon/marathon.go index b1fb869eb..c493a61e9 100644 --- a/provider/marathon/marathon.go +++ b/provider/marathon/marathon.go @@ -26,6 +26,13 @@ import ( const ( traceMaxScanTokenSize = 1024 * 1024 + marathonEventIDs = marathon.EventIDApplications | + marathon.EventIDAddHealthCheck | + marathon.EventIDDeploymentSuccess | + marathon.EventIDDeploymentFailed | + marathon.EventIDDeploymentInfo | + marathon.EventIDDeploymentStepSuccess | + marathon.EventIDDeploymentStepFailed ) // TaskState denotes the Mesos state a task can have. @@ -52,6 +59,8 @@ type Provider struct { KeepAlive flaeg.Duration `description:"Set a non-default TCP Keep Alive time in seconds"` ForceTaskHostname bool `description:"Force to use the task's hostname."` Basic *Basic `description:"Enable basic authentication"` + RespectReadinessChecks bool `description:"Filter out tasks with non-successful readiness checks during deployments"` + readyChecker *readinessChecker marathonClient marathon.Marathon } @@ -80,6 +89,13 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s config.HTTPBasicAuthUser = p.Basic.HTTPBasicAuthUser config.HTTPBasicPassword = p.Basic.HTTPBasicPassword } + var rc *readinessChecker + if p.RespectReadinessChecks { + log.Debug("Enabling Marathon readiness checker") + rc = defaultReadinessChecker(p.Trace) + } + p.readyChecker = rc + if len(p.DCOSToken) > 0 { config.DCOSToken = p.DCOSToken } @@ -104,7 +120,7 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s p.marathonClient = client if p.Watch { - update, err := client.AddEventsListener(marathon.EventIDApplications) + update, err := client.AddEventsListener(marathonEventIDs) if err != nil { log.Errorf("Failed to register for events, %s", err) return err @@ -116,7 +132,7 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s case <-stop: return case event := <-update: - log.Debug("Provider event received", event) + log.Debugf("Received provider event %s", event) configuration := p.loadMarathonConfig() if configuration != nil { configurationChan <- types.ConfigMessage{ @@ -175,6 +191,8 @@ func (p *Provider) loadMarathonConfig() *types.Configuration { v := url.Values{} v.Add("embed", "apps.tasks") + v.Add("embed", "apps.deployments") + v.Add("embed", "apps.readiness") applications, err := p.marathonClient.Applications(v) if err != nil { log.Errorf("Failed to retrieve Marathon applications: %s", err) @@ -258,6 +276,11 @@ func (p *Provider) taskFilter(task marathon.Task, application marathon.Applicati } } + if ready := p.readyChecker.Do(task, application); !ready { + log.Infof("Filtering unready task %s from application %s", task.ID, application.ID) + return false + } + return true } diff --git a/provider/marathon/marathon_test.go b/provider/marathon/marathon_test.go index 0673036d7..cd7fee449 100644 --- a/provider/marathon/marathon_test.go +++ b/provider/marathon/marathon_test.go @@ -52,8 +52,8 @@ func TestMarathonLoadConfigNonAPIErrors(t *testing.T) { }{ { desc: "simple application", - application: createApplication(appPorts(80)), - task: createLocalhostTask(taskPorts(80)), + application: application(appPorts(80)), + task: localhostTask(taskPorts(80)), expectedFrontends: map[string]*types.Frontend{ "frontend-app": { Backend: "backend-app", @@ -78,8 +78,8 @@ func TestMarathonLoadConfigNonAPIErrors(t *testing.T) { }, { desc: "filtered task", - application: createApplication(appPorts(80)), - task: createLocalhostTask( + application: application(appPorts(80)), + task: localhostTask( taskPorts(80), state(taskStateStaging), ), @@ -97,12 +97,12 @@ func TestMarathonLoadConfigNonAPIErrors(t *testing.T) { }, { desc: "load balancer / circuit breaker labels", - application: createApplication( + application: application( appPorts(80), label(types.LabelBackendLoadbalancerMethod, "drr"), label(types.LabelBackendCircuitbreakerExpression, "NetworkErrorRatio() > 0.5"), ), - task: createLocalhostTask(taskPorts(80)), + task: localhostTask(taskPorts(80)), expectedFrontends: map[string]*types.Frontend{ "frontend-app": { Backend: "backend-app", @@ -132,12 +132,12 @@ func TestMarathonLoadConfigNonAPIErrors(t *testing.T) { }, { desc: "general max connection labels", - application: createApplication( + application: application( appPorts(80), label(types.LabelBackendMaxconnAmount, "1000"), label(types.LabelBackendMaxconnExtractorfunc, "client.ip"), ), - task: createLocalhostTask(taskPorts(80)), + task: localhostTask(taskPorts(80)), expectedFrontends: map[string]*types.Frontend{ "frontend-app": { Backend: "backend-app", @@ -165,11 +165,11 @@ func TestMarathonLoadConfigNonAPIErrors(t *testing.T) { }, { desc: "max connection amount label only", - application: createApplication( + application: application( appPorts(80), label(types.LabelBackendMaxconnAmount, "1000"), ), - task: createLocalhostTask(taskPorts(80)), + task: localhostTask(taskPorts(80)), expectedFrontends: map[string]*types.Frontend{ "frontend-app": { Backend: "backend-app", @@ -194,11 +194,11 @@ func TestMarathonLoadConfigNonAPIErrors(t *testing.T) { }, { desc: "max connection extractor function label only", - application: createApplication( + application: application( appPorts(80), label(types.LabelBackendMaxconnExtractorfunc, "client.ip"), ), - task: createLocalhostTask(taskPorts(80)), + task: localhostTask(taskPorts(80)), expectedFrontends: map[string]*types.Frontend{ "frontend-app": { Backend: "backend-app", @@ -223,12 +223,12 @@ func TestMarathonLoadConfigNonAPIErrors(t *testing.T) { }, { desc: "health check labels", - application: createApplication( + application: application( appPorts(80), label(types.LabelBackendHealthcheckPath, "/path"), label(types.LabelBackendHealthcheckInterval, "5m"), ), - task: createTask( + task: task( host("127.0.0.1"), taskPorts(80), ), @@ -298,36 +298,37 @@ func TestMarathonLoadConfigNonAPIErrors(t *testing.T) { func TestMarathonTaskFilter(t *testing.T) { cases := []struct { - desc string - task marathon.Task - application marathon.Application - expected bool + desc string + task marathon.Task + application marathon.Application + readyChecker *readinessChecker + expected bool }{ { desc: "missing port", - task: createTask(), - application: createApplication(), + task: task(), + application: application(), expected: false, }, { desc: "task not running", - task: createTask( + task: task( taskPorts(80), state(taskStateStaging), ), - application: createApplication(appPorts(80)), + application: application(appPorts(80)), expected: false, }, { desc: "existing port", - task: createTask(taskPorts(80)), - application: createApplication(appPorts(80)), + task: task(taskPorts(80)), + application: application(appPorts(80)), expected: true, }, { desc: "ambiguous port specification", - task: createTask(taskPorts(80, 443)), - application: createApplication( + task: task(taskPorts(80, 443)), + application: application( appPorts(80, 443), label(types.LabelPort, "443"), label(types.LabelPortIndex, "1"), @@ -336,8 +337,8 @@ func TestMarathonTaskFilter(t *testing.T) { }, { desc: "healthcheck available", - task: createTask(taskPorts(80)), - application: createApplication( + task: task(taskPorts(80)), + application: application( appPorts(80), healthChecks(marathon.NewDefaultHealthCheck()), ), @@ -345,11 +346,11 @@ func TestMarathonTaskFilter(t *testing.T) { }, { desc: "healthcheck result false", - task: createTask( + task: task( taskPorts(80), healthCheckResultLiveness(false), ), - application: createApplication( + application: application( appPorts(80), healthChecks(marathon.NewDefaultHealthCheck()), ), @@ -357,11 +358,11 @@ func TestMarathonTaskFilter(t *testing.T) { }, { desc: "healthcheck results mixed", - task: createTask( + task: task( taskPorts(80), healthCheckResultLiveness(true, false), ), - application: createApplication( + application: application( appPorts(80), healthChecks(marathon.NewDefaultHealthCheck()), ), @@ -369,23 +370,35 @@ func TestMarathonTaskFilter(t *testing.T) { }, { desc: "healthcheck result true", - task: createTask( + task: task( taskPorts(80), healthCheckResultLiveness(true), ), - application: createApplication( + application: application( appPorts(80), healthChecks(marathon.NewDefaultHealthCheck()), ), expected: true, }, + { + desc: "readiness check false", + task: task(taskPorts(80)), + application: application( + appPorts(80), + deployments("deploymentId"), + readinessCheck(0), + readinessCheckResult(testTaskName, false), + ), + readyChecker: testReadinessChecker(), + expected: false, + }, } for _, c := range cases { c := c t.Run(c.desc, func(t *testing.T) { t.Parallel() - provider := &Provider{} + provider := &Provider{readyChecker: c.readyChecker} actual := provider.taskFilter(c.task, c.application) if actual != c.expected { t.Errorf("actual %v, expected %v", actual, c.expected) @@ -403,19 +416,19 @@ func TestMarathonApplicationFilterConstraints(t *testing.T) { }{ { desc: "tags missing", - application: createApplication(), + application: application(), marathonLBCompatibility: false, expected: false, }, { desc: "tag matching", - application: createApplication(label(types.LabelTags, "valid")), + application: application(label(types.LabelTags, "valid")), marathonLBCompatibility: false, expected: true, }, { desc: "LB compatibility tag matching", - application: createApplication( + application: application( label("HAPROXY_GROUP", "valid"), label(types.LabelTags, "notvalid"), ), @@ -495,7 +508,7 @@ func TestMarathonApplicationFilterEnabled(t *testing.T) { t.Run(c.desc, func(t *testing.T) { t.Parallel() provider := &Provider{ExposedByDefault: c.exposedByDefault} - app := createApplication(label(types.LabelEnable, c.enabledLabel)) + app := application(label(types.LabelEnable, c.enabledLabel)) if provider.applicationFilter(app) != c.expected { t.Errorf("got unexpected filtering = %t", !c.expected) } @@ -514,70 +527,70 @@ func TestMarathonGetPort(t *testing.T) { }{ { desc: "port missing", - application: createApplication(), - task: createTask(), + application: application(), + task: task(), expected: "", }, { desc: "numeric port", - application: createApplication(label(types.LabelPort, "80")), - task: createTask(), + application: application(label(types.LabelPort, "80")), + task: task(), expected: "80", }, { desc: "string port", - application: createApplication(label(types.LabelPort, "foobar")), - task: createTask(taskPorts(80)), + application: application(label(types.LabelPort, "foobar")), + task: task(taskPorts(80)), expected: "", }, { desc: "negative port", - application: createApplication(label(types.LabelPort, "-1")), - task: createTask(taskPorts(80)), + application: application(label(types.LabelPort, "-1")), + task: task(taskPorts(80)), expected: "", }, { desc: "task port available", - application: createApplication(), - task: createTask(taskPorts(80)), + application: application(), + task: task(taskPorts(80)), expected: "80", }, { desc: "port definition available", - application: createApplication( + application: application( portDefinition(443), ), - task: createTask(), + task: task(), expected: "443", }, { desc: "IP-per-task port available", - application: createApplication(ipAddrPerTask(8000)), - task: createTask(), + application: application(ipAddrPerTask(8000)), + task: task(), expected: "8000", }, { desc: "multiple task ports available", - application: createApplication(), - task: createTask(taskPorts(80, 443)), + application: application(), + task: task(taskPorts(80, 443)), expected: "80", }, { desc: "numeric port index specified", - application: createApplication(label(types.LabelPortIndex, "1")), - task: createTask(taskPorts(80, 443)), + application: application(label(types.LabelPortIndex, "1")), + task: task(taskPorts(80, 443)), expected: "443", }, { desc: "string port index specified", - application: createApplication(label(types.LabelPortIndex, "foobar")), - task: createTask(taskPorts(80)), + application: application(label(types.LabelPortIndex, "foobar")), + task: task(taskPorts(80)), expected: "", }, { desc: "task and application ports specified", - application: createApplication(appPorts(9999)), - task: createTask(taskPorts(7777)), + application: application(appPorts(9999)), + task: task(taskPorts(7777)), expected: "7777", }, } @@ -602,12 +615,12 @@ func TestMarathonGetWeight(t *testing.T) { }{ { desc: "label missing", - application: createApplication(), + application: application(), expected: "0", }, { desc: "label existing", - application: createApplication(label(types.LabelWeight, "10")), + application: application(label(types.LabelWeight, "10")), expected: "10", }, } @@ -633,12 +646,12 @@ func TestMarathonGetDomain(t *testing.T) { }{ { desc: "label missing", - application: createApplication(), + application: application(), expected: "docker.localhost", }, { desc: "label existing", - application: createApplication(label(types.LabelDomain, "foo.bar")), + application: application(label(types.LabelDomain, "foo.bar")), expected: "foo.bar", }, } @@ -666,12 +679,12 @@ func TestMarathonGetProtocol(t *testing.T) { }{ { desc: "label missing", - application: createApplication(), + application: application(), expected: "http", }, { desc: "label existing", - application: createApplication(label(types.LabelProtocol, "https")), + application: application(label(types.LabelProtocol, "https")), expected: "https", }, } @@ -697,12 +710,12 @@ func TestMarathonGetSticky(t *testing.T) { }{ { desc: "label missing", - application: createApplication(), + application: application(), expected: "false", }, { desc: "label existing", - application: createApplication(label(types.LabelBackendLoadbalancerSticky, "true")), + application: application(label(types.LabelBackendLoadbalancerSticky, "true")), expected: "true", }, } @@ -728,12 +741,12 @@ func TestMarathonGetPassHostHeader(t *testing.T) { }{ { desc: "label missing", - application: createApplication(), + application: application(), expected: "true", }, { desc: "label existing", - application: createApplication(label(types.LabelFrontendPassHostHeader, "false")), + application: application(label(types.LabelFrontendPassHostHeader, "false")), expected: "false", }, } @@ -759,17 +772,17 @@ func TestMarathonMaxConnAmount(t *testing.T) { }{ { desc: "label missing", - application: createApplication(), + application: application(), expected: math.MaxInt64, }, { desc: "non-integer value", - application: createApplication(label(types.LabelBackendMaxconnAmount, "foobar")), + application: application(label(types.LabelBackendMaxconnAmount, "foobar")), expected: math.MaxInt64, }, { desc: "label existing", - application: createApplication(label(types.LabelBackendMaxconnAmount, "32")), + application: application(label(types.LabelBackendMaxconnAmount, "32")), expected: 32, }, } @@ -795,12 +808,12 @@ func TestMarathonGetMaxConnExtractorFunc(t *testing.T) { }{ { desc: "label missing", - application: createApplication(), + application: application(), expected: "request.host", }, { desc: "label existing", - application: createApplication(label(types.LabelBackendMaxconnExtractorfunc, "client.ip")), + application: application(label(types.LabelBackendMaxconnExtractorfunc, "client.ip")), expected: "client.ip", }, } @@ -826,12 +839,12 @@ func TestMarathonGetLoadBalancerMethod(t *testing.T) { }{ { desc: "label missing", - application: createApplication(), + application: application(), expected: "wrr", }, { desc: "label existing", - application: createApplication(label(types.LabelBackendLoadbalancerMethod, "drr")), + application: application(label(types.LabelBackendLoadbalancerMethod, "drr")), expected: "drr", }, } @@ -857,12 +870,12 @@ func TestMarathonGetCircuitBreakerExpression(t *testing.T) { }{ { desc: "label missing", - application: createApplication(), + application: application(), expected: "NetworkErrorRatio() > 1", }, { desc: "label existing", - application: createApplication(label(types.LabelBackendCircuitbreakerExpression, "NetworkErrorRatio() > 0.5")), + application: application(label(types.LabelBackendCircuitbreakerExpression, "NetworkErrorRatio() > 0.5")), expected: "NetworkErrorRatio() > 0.5", }, } @@ -888,12 +901,12 @@ func TestMarathonGetEntryPoints(t *testing.T) { }{ { desc: "label missing", - application: createApplication(), + application: application(), expected: []string{}, }, { desc: "label existing", - application: createApplication(label(types.LabelFrontendEntryPoints, "http,https")), + application: application(label(types.LabelFrontendEntryPoints, "http,https")), expected: []string{"http", "https"}, }, } @@ -920,13 +933,13 @@ func TestMarathonGetFrontendRule(t *testing.T) { }{ { desc: "label missing", - application: createApplication(appID("test")), + application: application(appID("test")), marathonLBCompatibility: true, expected: "Host:test.docker.localhost", }, { desc: "HAProxy vhost available and LB compat disabled", - application: createApplication( + application: application( appID("test"), label("HAPROXY_0_VHOST", "foo.bar"), ), @@ -935,14 +948,14 @@ func TestMarathonGetFrontendRule(t *testing.T) { }, { desc: "HAProxy vhost available and LB compat enabled", - application: createApplication(label("HAPROXY_0_VHOST", "foo.bar")), + application: application(label("HAPROXY_0_VHOST", "foo.bar")), marathonLBCompatibility: true, expected: "Host:foo.bar", }, { desc: "frontend rule available", - application: createApplication( + application: application( label(types.LabelFrontendRule, "Host:foo.bar"), label("HAPROXY_0_VHOST", "unused"), ), @@ -975,12 +988,12 @@ func TestMarathonGetBackend(t *testing.T) { }{ { desc: "label missing", - application: createApplication(appID("/group/app")), + application: application(appID("/group/app")), expected: "-group-app", }, { desc: "label existing", - application: createApplication(label(types.LabelBackend, "bar")), + application: application(label(types.LabelBackend, "bar")), expected: "bar", }, } @@ -1056,7 +1069,7 @@ func TestMarathonHasHealthCheckLabels(t *testing.T) { c := c t.Run(c.desc, func(t *testing.T) { t.Parallel() - app := createApplication() + app := application() if c.value != nil { app.AddLabel(types.LabelBackendHealthcheckPath, *c.value) } @@ -1090,7 +1103,7 @@ func TestMarathonGetHealthCheckPath(t *testing.T) { c := c t.Run(c.desc, func(t *testing.T) { t.Parallel() - app := createApplication() + app := application() if c.value != "" { app.AddLabel(types.LabelBackendHealthcheckPath, c.value) } @@ -1124,7 +1137,7 @@ func TestMarathonGetHealthCheckInterval(t *testing.T) { c := c t.Run(c.desc, func(t *testing.T) { t.Parallel() - app := createApplication() + app := application() if c.value != "" { app.AddLabel(types.LabelBackendHealthcheckInterval, c.value) } @@ -1148,49 +1161,49 @@ func TestGetBackendServer(t *testing.T) { }{ { desc: "application without IP-per-task", - application: createApplication(), + application: application(), expectedServer: host, }, { desc: "task hostname override", - application: createApplication(ipAddrPerTask(8000)), + application: application(ipAddrPerTask(8000)), forceTaskHostname: true, expectedServer: host, }, { desc: "task IP address missing", - application: createApplication(ipAddrPerTask(8000)), - task: createTask(), + application: application(ipAddrPerTask(8000)), + task: task(), expectedServer: "", }, { desc: "single task IP address", - application: createApplication(ipAddrPerTask(8000)), - task: createTask(ipAddresses("1.1.1.1")), + application: application(ipAddrPerTask(8000)), + task: task(ipAddresses("1.1.1.1")), expectedServer: "1.1.1.1", }, { desc: "multiple task IP addresses without index label", - application: createApplication(ipAddrPerTask(8000)), - task: createTask(ipAddresses("1.1.1.1", "2.2.2.2")), + application: application(ipAddrPerTask(8000)), + task: task(ipAddresses("1.1.1.1", "2.2.2.2")), expectedServer: "", }, { desc: "multiple task IP addresses with invalid index label", - application: createApplication( + application: application( label("traefik.ipAddressIdx", "invalid"), ipAddrPerTask(8000), ), - task: createTask(ipAddresses("1.1.1.1", "2.2.2.2")), + task: task(ipAddresses("1.1.1.1", "2.2.2.2")), expectedServer: "", }, { desc: "multiple task IP addresses with valid index label", - application: createApplication( + application: application( label("traefik.ipAddressIdx", "1"), ipAddrPerTask(8000), ), - task: createTask(ipAddresses("1.1.1.1", "2.2.2.2")), + task: task(ipAddresses("1.1.1.1", "2.2.2.2")), expectedServer: "2.2.2.2", }, } @@ -1275,12 +1288,12 @@ func TestMarathonGetBasicAuth(t *testing.T) { }{ { desc: "label missing", - application: createApplication(), + application: application(), expected: []string{}, }, { desc: "label existing", - application: createApplication(label(types.LabelFrontendAuthBasic, "user:password")), + application: application(label(types.LabelFrontendAuthBasic, "user:password")), expected: []string{"user:password"}, }, } diff --git a/provider/marathon/readiness.go b/provider/marathon/readiness.go new file mode 100644 index 000000000..6a52ae6b7 --- /dev/null +++ b/provider/marathon/readiness.go @@ -0,0 +1,122 @@ +package marathon + +import ( + "time" + + "github.com/containous/traefik/log" + marathon "github.com/gambol99/go-marathon" +) + +const ( + // readinessCheckDefaultTimeout is the default timeout for a readiness + // check if no check timeout is specified on the application spec. This + // should really never be the case, but better be safe than sorry. + readinessCheckDefaultTimeout time.Duration = 10 * time.Second + // readinessCheckSafetyMargin is some buffer duration to account for + // small offsets in readiness check execution. + readinessCheckSafetyMargin time.Duration = 5 * time.Second + readinessLogHeader string = "Marathon readiness check: " +) + +type readinessChecker struct { + checkDefaultTimeout time.Duration + checkSafetyMargin time.Duration + traceLogging bool +} + +func defaultReadinessChecker(isTraceLogging bool) *readinessChecker { + return &readinessChecker{ + checkDefaultTimeout: readinessCheckDefaultTimeout, + checkSafetyMargin: readinessCheckSafetyMargin, + traceLogging: isTraceLogging, + } +} + +func (rc *readinessChecker) Do(task marathon.Task, app marathon.Application) bool { + if rc == nil { + // Readiness checker disabled. + return true + } + + switch { + case len(app.Deployments) == 0: + // We only care about readiness during deployments; post-deployment readiness + // can be covered by a periodic post-deployment probe (i.e., Traefik health checks). + rc.tracef("task %s app %s: ready = true [no deployment ongoing]", task.ID, app.ID) + return true + + case app.ReadinessChecks == nil || len(*app.ReadinessChecks) == 0: + // Applications without configured readiness checks are always considered + // ready. + rc.tracef("task %s app %s: ready = true [no readiness checks on app]", task.ID, app.ID) + return true + } + + // Loop through all readiness check results and return the results for + // matching task IDs. + if app.ReadinessCheckResults != nil { + for _, readinessCheckResult := range *app.ReadinessCheckResults { + if readinessCheckResult.TaskID == task.ID { + rc.tracef("task %s app %s: ready = %t [evaluating readiness check ready state]", task.ID, app.ID, readinessCheckResult.Ready) + return readinessCheckResult.Ready + } + } + } + + // There's a corner case sometimes hit where the first new task of a + // deployment goes from TASK_STAGING to TASK_RUNNING without a corresponding + // readiness check result being included in the API response. This only happens + // in a very short (yet unlucky) time frame and does not repeat for subsequent + // tasks of the same deployment. + // Complicating matters, the situation may occur for both initially deploying + // applications as well as rolling-upgraded ones where one or more tasks from + // a previous deployment exist already and are joined by new tasks from a + // subsequent deployment. We must always make sure that pre-existing tasks + // maintain their ready state while newly launched tasks must be considered + // unready until a check result appears. + // We distinguish the two cases by comparing the current time with the start + // time of the task: It should take Marathon at most one readiness check timeout + // interval (plus some safety margin to account for the delayed nature of + // distributed systems) for readiness check results to be returned along the API + // response. Once the task turns old enough, we assume it to be part of a + // pre-existing deployment and mark it as ready. Note that it is okay to err + // on the side of caution and consider a task unready until the safety time + // window has elapsed because a newly created task should be readiness-checked + // and be given a result fairly shortly after its creation (i.e., on the scale + // of seconds). + readinessCheckTimeoutSecs := (*app.ReadinessChecks)[0].TimeoutSeconds + readinessCheckTimeout := time.Duration(readinessCheckTimeoutSecs) * time.Second + if readinessCheckTimeout == 0 { + rc.tracef("task %s app %s: readiness check timeout not set, using default value %s", task.ID, app.ID, rc.checkDefaultTimeout) + readinessCheckTimeout = rc.checkDefaultTimeout + } else { + readinessCheckTimeout += rc.checkSafetyMargin + } + + startTime, err := time.Parse(time.RFC3339, task.StartedAt) + if err != nil { + // An unparseable start time should never occur; if it does, we assume the + // problem should be surfaced as quickly as possible, which is easiest if + // we shun the task from rotation. + log.Warnf("Failed to parse start-time %s of task %s from application %s: %s (assuming unready)", task.StartedAt, task.ID, app.ID, err) + return false + } + + since := time.Since(startTime) + if since < readinessCheckTimeout { + rc.tracef("task %s app %s: ready = false [task with start-time %s not within assumed check timeout window of %s (elapsed time since task start: %s)]", task.ID, app.ID, startTime.Format(time.RFC3339), readinessCheckTimeout, since) + return false + } + + // Finally, we can be certain this task is not part of the deployment (i.e., + // it's an old task that's going to transition into the TASK_KILLING and/or + // TASK_KILLED state as new tasks' readiness checks gradually turn green.) + rc.tracef("task %s app %s: ready = true [task with start-time %s not involved in deployment (elapsed time since task start: %s)]", task.ID, app.ID, startTime.Format(time.RFC3339), since) + return true +} + +func (rc *readinessChecker) tracef(format string, args ...interface{}) { + if rc.traceLogging { + log.Debugf(readinessLogHeader+format, args...) + } +} diff --git a/provider/marathon/readiness_test.go b/provider/marathon/readiness_test.go new file mode 100644 index 000000000..edfca8241 --- /dev/null +++ b/provider/marathon/readiness_test.go @@ -0,0 +1,134 @@ +package marathon + +import ( + "testing" + "time" + + "github.com/gambol99/go-marathon" +) + +func testReadinessChecker() *readinessChecker { + return defaultReadinessChecker(false) +} + +func TestDisabledReadinessChecker(t *testing.T) { + var rc *readinessChecker + tsk := task() + app := application( + deployments("deploymentId"), + readinessCheck(0), + readinessCheckResult(testTaskName, false), + ) + + if ready := rc.Do(tsk, app); ready == false { + t.Error("expected ready = true") + } +} + +func TestEnabledReadinessChecker(t *testing.T) { + tests := []struct { + desc string + task marathon.Task + app marathon.Application + rc readinessChecker + expectedReady bool + }{ + { + desc: "no deployment running", + task: task(), + app: application(), + expectedReady: true, + }, + { + desc: "no readiness checks defined", + task: task(), + app: application(deployments("deploymentId")), + expectedReady: true, + }, + { + desc: "readiness check result negative", + task: task(), + app: application( + deployments("deploymentId"), + readinessCheck(0), + readinessCheckResult("otherTaskID", true), + readinessCheckResult(testTaskName, false), + ), + expectedReady: false, + }, + { + desc: "readiness check result positive", + task: task(), + app: application( + deployments("deploymentId"), + readinessCheck(0), + readinessCheckResult("otherTaskID", false), + readinessCheckResult(testTaskName, true), + ), + expectedReady: true, + }, + { + desc: "no readiness check result with default timeout", + task: task(startedAtFromNow(3 * time.Minute)), + app: application( + deployments("deploymentId"), + readinessCheck(0), + ), + rc: readinessChecker{ + checkDefaultTimeout: 5 * time.Minute, + }, + expectedReady: false, + }, + { + desc: "no readiness check result with readiness check timeout", + task: task(startedAtFromNow(4 * time.Minute)), + app: application( + deployments("deploymentId"), + readinessCheck(3*time.Minute), + ), + rc: readinessChecker{ + checkSafetyMargin: 3 * time.Minute, + }, + expectedReady: false, + }, + { + desc: "invalid task start time", + task: task(startedAt("invalid")), + app: application( + deployments("deploymentId"), + readinessCheck(0), + ), + expectedReady: false, + }, + { + desc: "task not involved in deployment", + task: task(startedAtFromNow(1 * time.Hour)), + app: application( + deployments("deploymentId"), + readinessCheck(0), + ), + rc: readinessChecker{ + checkDefaultTimeout: 10 * time.Second, + }, + expectedReady: true, + }, + } + + for _, test := range tests { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + rc := testReadinessChecker() + if test.rc.checkDefaultTimeout > 0 { + rc.checkDefaultTimeout = test.rc.checkDefaultTimeout + } + if test.rc.checkSafetyMargin > 0 { + rc.checkSafetyMargin = test.rc.checkSafetyMargin + } + actualReady := test.rc.Do(test.task, test.app) + if actualReady != test.expectedReady { + t.Errorf("actual ready = %t, expected ready = %t", actualReady, test.expectedReady) + } + }) + } +} diff --git a/traefik.sample.toml b/traefik.sample.toml index b9ed3c33a..072d63327 100644 --- a/traefik.sample.toml +++ b/traefik.sample.toml @@ -672,6 +672,17 @@ # # forceTaskHostname: false +# Applications may define readiness checks which are probed by Marathon during +# deployments periodically and the results exposed via the API. Enabling the +# following parameter causes Traefik to filter out tasks whose readiness checks +# have not succeeded. +# Note that the checks are only valid at deployment times. See the Marathon +# guide for details. +# +# Optional +# Default: false +# respectReadinessChecks: false + ################################################################ # Mesos configuration backend ################################################################