Add request accepting grace period delaying graceful shutdown.

This commit is contained in:
Timo Reimann 2017-09-26 10:22:03 +02:00 committed by Traefiker
parent fc550ac1fc
commit 1c98a9ad3e
10 changed files with 251 additions and 38 deletions

View file

@ -180,6 +180,11 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
DialTimeout: flaeg.Duration(configuration.DefaultDialTimeout), DialTimeout: flaeg.Duration(configuration.DefaultDialTimeout),
} }
// default LifeCycle
defaultLifeycle := configuration.LifeCycle{
GraceTimeOut: flaeg.Duration(configuration.DefaultGraceTimeout),
}
defaultConfiguration := configuration.GlobalConfiguration{ defaultConfiguration := configuration.GlobalConfiguration{
Docker: &defaultDocker, Docker: &defaultDocker,
File: &defaultFile, File: &defaultFile,
@ -202,6 +207,7 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
ForwardingTimeouts: &forwardingTimeouts, ForwardingTimeouts: &forwardingTimeouts,
TraefikLog: &defaultTraefikLog, TraefikLog: &defaultTraefikLog,
AccessLog: &defaultAccessLog, AccessLog: &defaultAccessLog,
LifeCycle: &defaultLifeycle,
} }
return &TraefikConfiguration{ return &TraefikConfiguration{
@ -213,7 +219,6 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
func NewTraefikConfiguration() *TraefikConfiguration { func NewTraefikConfiguration() *TraefikConfiguration {
return &TraefikConfiguration{ return &TraefikConfiguration{
GlobalConfiguration: configuration.GlobalConfiguration{ GlobalConfiguration: configuration.GlobalConfiguration{
GraceTimeOut: flaeg.Duration(10 * time.Second),
AccessLogsFile: "", AccessLogsFile: "",
TraefikLogsFile: "", TraefikLogsFile: "",
LogLevel: "ERROR", LogLevel: "ERROR",

View file

@ -22,7 +22,6 @@ import (
"github.com/containous/traefik/log" "github.com/containous/traefik/log"
"github.com/containous/traefik/provider/ecs" "github.com/containous/traefik/provider/ecs"
"github.com/containous/traefik/provider/kubernetes" "github.com/containous/traefik/provider/kubernetes"
"github.com/containous/traefik/provider/rancher"
"github.com/containous/traefik/safe" "github.com/containous/traefik/safe"
"github.com/containous/traefik/server" "github.com/containous/traefik/server"
"github.com/containous/traefik/types" "github.com/containous/traefik/types"
@ -228,36 +227,7 @@ func run(globalConfiguration *configuration.GlobalConfiguration) {
http.DefaultTransport.(*http.Transport).Proxy = http.ProxyFromEnvironment http.DefaultTransport.(*http.Transport).Proxy = http.ProxyFromEnvironment
if len(globalConfiguration.EntryPoints) == 0 { globalConfiguration.SetEffectiveConfiguration()
globalConfiguration.EntryPoints = map[string]*configuration.EntryPoint{"http": {Address: ":80"}}
globalConfiguration.DefaultEntryPoints = []string{"http"}
}
if globalConfiguration.Rancher != nil {
// Ensure backwards compatibility for now
if len(globalConfiguration.Rancher.AccessKey) > 0 ||
len(globalConfiguration.Rancher.Endpoint) > 0 ||
len(globalConfiguration.Rancher.SecretKey) > 0 {
if globalConfiguration.Rancher.API == nil {
globalConfiguration.Rancher.API = &rancher.APIConfiguration{
AccessKey: globalConfiguration.Rancher.AccessKey,
SecretKey: globalConfiguration.Rancher.SecretKey,
Endpoint: globalConfiguration.Rancher.Endpoint,
}
}
log.Warn("Deprecated configuration found: rancher.[accesskey|secretkey|endpoint]. " +
"Please use rancher.api.[accesskey|secretkey|endpoint] instead.")
}
if globalConfiguration.Rancher.Metadata != nil && len(globalConfiguration.Rancher.Metadata.Prefix) == 0 {
globalConfiguration.Rancher.Metadata.Prefix = "latest"
}
}
if globalConfiguration.Debug {
globalConfiguration.LogLevel = "DEBUG"
}
// logging // logging
level, err := logrus.ParseLevel(strings.ToLower(globalConfiguration.LogLevel)) level, err := logrus.ParseLevel(strings.ToLower(globalConfiguration.LogLevel))

View file

@ -11,6 +11,7 @@ import (
"github.com/containous/flaeg" "github.com/containous/flaeg"
"github.com/containous/traefik/acme" "github.com/containous/traefik/acme"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider/boltdb" "github.com/containous/traefik/provider/boltdb"
"github.com/containous/traefik/provider/consul" "github.com/containous/traefik/provider/consul"
"github.com/containous/traefik/provider/docker" "github.com/containous/traefik/provider/docker"
@ -37,12 +38,17 @@ const (
// DefaultIdleTimeout before closing an idle connection. // DefaultIdleTimeout before closing an idle connection.
DefaultIdleTimeout = 180 * time.Second DefaultIdleTimeout = 180 * time.Second
// DefaultGraceTimeout controls how long Traefik serves pending requests
// prior to shutting down.
DefaultGraceTimeout = 10 * time.Second
) )
// GlobalConfiguration holds global configuration (with providers, etc.). // GlobalConfiguration holds global configuration (with providers, etc.).
// It's populated from the traefik configuration file passed as an argument to the binary. // It's populated from the traefik configuration file passed as an argument to the binary.
type GlobalConfiguration struct { type GlobalConfiguration struct {
GraceTimeOut flaeg.Duration `short:"g" description:"Duration to give active requests a chance to finish before Traefik stops"` LifeCycle *LifeCycle `description:"Timeouts influencing the server life cycle"`
GraceTimeOut flaeg.Duration `short:"g" description:"(Deprecated) Duration to give active requests a chance to finish before Traefik stops"` // Deprecated
Debug bool `short:"d" description:"Enable debug mode"` Debug bool `short:"d" description:"Enable debug mode"`
CheckNewVersion bool `description:"Periodically check if a new version has been released"` CheckNewVersion bool `description:"Periodically check if a new version has been released"`
AccessLogsFile string `description:"(Deprecated) Access logs file"` // Deprecated AccessLogsFile string `description:"(Deprecated) Access logs file"` // Deprecated
@ -81,6 +87,52 @@ type GlobalConfiguration struct {
DynamoDB *dynamodb.Provider `description:"Enable DynamoDB backend with default settings"` DynamoDB *dynamodb.Provider `description:"Enable DynamoDB backend with default settings"`
} }
// SetEffectiveConfiguration adds missing configuration parameters derived from
// existing ones. It also takes care of maintaining backwards compatibility.
func (gc *GlobalConfiguration) SetEffectiveConfiguration() {
if len(gc.EntryPoints) == 0 {
gc.EntryPoints = map[string]*EntryPoint{"http": {Address: ":80"}}
gc.DefaultEntryPoints = []string{"http"}
}
// Make sure LifeCycle isn't nil to spare nil checks elsewhere.
if gc.LifeCycle == nil {
gc.LifeCycle = &LifeCycle{}
}
// Prefer legacy grace timeout parameter for backwards compatibility reasons.
if gc.GraceTimeOut > 0 {
log.Warn("top-level grace period configuration has been deprecated -- please use lifecycle grace period")
gc.LifeCycle.GraceTimeOut = gc.GraceTimeOut
}
if gc.Rancher != nil {
// Ensure backwards compatibility for now
if len(gc.Rancher.AccessKey) > 0 ||
len(gc.Rancher.Endpoint) > 0 ||
len(gc.Rancher.SecretKey) > 0 {
if gc.Rancher.API == nil {
gc.Rancher.API = &rancher.APIConfiguration{
AccessKey: gc.Rancher.AccessKey,
SecretKey: gc.Rancher.SecretKey,
Endpoint: gc.Rancher.Endpoint,
}
}
log.Warn("Deprecated configuration found: rancher.[accesskey|secretkey|endpoint]. " +
"Please use rancher.api.[accesskey|secretkey|endpoint] instead.")
}
if gc.Rancher.Metadata != nil && len(gc.Rancher.Metadata.Prefix) == 0 {
gc.Rancher.Metadata.Prefix = "latest"
}
}
if gc.Debug {
gc.LogLevel = "DEBUG"
}
}
// DefaultEntryPoints holds default entry points // DefaultEntryPoints holds default entry points
type DefaultEntryPoints []string type DefaultEntryPoints []string
@ -446,3 +498,10 @@ type ForwardingTimeouts struct {
DialTimeout flaeg.Duration `description:"The amount of time to wait until a connection to a backend server can be established. Defaults to 30 seconds. If zero, no timeout exists"` DialTimeout flaeg.Duration `description:"The amount of time to wait until a connection to a backend server can be established. Defaults to 30 seconds. If zero, no timeout exists"`
ResponseHeaderTimeout flaeg.Duration `description:"The amount of time to wait for a server's response headers after fully writing the request (including its body, if any). If zero, no timeout exists"` ResponseHeaderTimeout flaeg.Duration `description:"The amount of time to wait for a server's response headers after fully writing the request (including its body, if any). If zero, no timeout exists"`
} }
// LifeCycle contains configurations relevant to the lifecycle (such as the
// shutdown phase) of Traefik.
type LifeCycle struct {
RequestAcceptGraceTimeout flaeg.Duration `description:"Duration to keep accepting requests before Traefik initiates the graceful shutdown procedure"`
GraceTimeOut flaeg.Duration `description:"Duration to give active requests a chance to finish before Traefik stops"`
}

View file

@ -3,7 +3,9 @@ package configuration
import ( import (
"fmt" "fmt"
"testing" "testing"
"time"
"github.com/containous/flaeg"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -199,3 +201,51 @@ func TestEntryPoints_Set(t *testing.T) {
}) })
} }
} }
func TestSetEffecticeConfiguration(t *testing.T) {
tests := []struct {
desc string
legacyGraceTimeout time.Duration
lifeCycleGraceTimeout time.Duration
wantGraceTimeout time.Duration
}{
{
desc: "legacy grace timeout given only",
legacyGraceTimeout: 5 * time.Second,
wantGraceTimeout: 5 * time.Second,
},
{
desc: "legacy and life cycle grace timeouts given",
legacyGraceTimeout: 5 * time.Second,
lifeCycleGraceTimeout: 12 * time.Second,
wantGraceTimeout: 5 * time.Second,
},
{
desc: "legacy grace timeout omitted",
legacyGraceTimeout: 0,
lifeCycleGraceTimeout: 12 * time.Second,
wantGraceTimeout: 12 * time.Second,
},
}
for _, test := range tests {
test := test
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
gc := &GlobalConfiguration{
GraceTimeOut: flaeg.Duration(test.legacyGraceTimeout),
}
if test.lifeCycleGraceTimeout > 0 {
gc.LifeCycle = &LifeCycle{
GraceTimeOut: flaeg.Duration(test.lifeCycleGraceTimeout),
}
}
gc.SetEffectiveConfiguration()
gotGraceTimeout := time.Duration(gc.LifeCycle.GraceTimeOut)
if gotGraceTimeout != test.wantGraceTimeout {
t.Fatalf("got effective grace timeout %d, want %d", gotGraceTimeout, test.wantGraceTimeout)
}
})
}
}

View file

@ -3,10 +3,16 @@
## Main Section ## Main Section
```toml ```toml
# Duration to give active requests a chance to finish before Traefik stops. # DEPRECATED - for general usage instruction see [lifeCycle.graceTimeOut].
#
# If both the deprecated option and the new one are given, the deprecated one
# takes precedence.
# A value of zero is equivalent to omitting the parameter, causing
# [lifeCycle.graceTimeOut] to be effective. Pass zero to the new option in
# order to disable the grace period.
# #
# Optional # Optional
# Default: "10s" # Default: "0s"
# #
# graceTimeOut = "10s" # graceTimeOut = "10s"
@ -303,6 +309,38 @@ Given provider-specific support, the value may be overridden on a per-backend ba
Can be provided in a format supported by [time.ParseDuration](https://golang.org/pkg/time/#ParseDuration) or as raw values (digits). Can be provided in a format supported by [time.ParseDuration](https://golang.org/pkg/time/#ParseDuration) or as raw values (digits).
If no units are provided, the value is parsed assuming seconds. If no units are provided, the value is parsed assuming seconds.
## Life Cycle
Controls the behavior of Traefik during the shutdown phase.
```toml
[lifeCycle]
# Duration to keep accepting requests prior to initiating the graceful
# termination period (as defined by the `graceTimeOut` option). This
# option is meant to give downstream load-balancers sufficient time to
# take Traefik out of rotation.
# Can be provided in a format supported by [time.ParseDuration](https://golang.org/pkg/time/#ParseDuration) or as raw values (digits).
# If no units are provided, the value is parsed assuming seconds.
# The zero duration disables the request accepting grace period, i.e.,
# Traefik will immediately proceed to the grace period.
#
# Optional
# Default: 0
#
# requestAcceptGraceTimeout = "10s"
# Duration to give active requests a chance to finish before Traefik stops.
# Can be provided in a format supported by [time.ParseDuration](https://golang.org/pkg/time/#ParseDuration) or as raw values (digits).
# If no units are provided, the value is parsed assuming seconds.
# Note: in this time frame no new requests are accepted.
#
# Optional
# Default: "10s"
#
# graceTimeOut = "10s"
```
## Timeouts ## Timeouts
### Responding Timeouts ### Responding Timeouts

View file

@ -3,7 +3,9 @@ package integration
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"os"
"strings" "strings"
"syscall"
"time" "time"
"github.com/containous/traefik/integration/try" "github.com/containous/traefik/integration/try"
@ -101,3 +103,62 @@ func (s *SimpleSuite) TestPrintHelp(c *check.C) {
}) })
c.Assert(err, checker.IsNil) c.Assert(err, checker.IsNil)
} }
func (s *SimpleSuite) TestRequestAcceptGraceTimeout(c *check.C) {
s.createComposeProject(c, "reqacceptgrace")
s.composeProject.Start(c)
whoami := "http://" + s.composeProject.Container(c, "whoami").NetworkSettings.IPAddress + ":80"
file := s.adaptFile(c, "fixtures/reqacceptgrace.toml", struct {
Server string
}{whoami})
defer os.Remove(file)
cmd, display := s.traefikCmd(withConfigFile(file))
defer display(c)
err := cmd.Start()
c.Assert(err, checker.IsNil)
defer cmd.Process.Kill()
// Wait for Traefik to turn ready.
err = try.GetRequest("http://127.0.0.1:8000/", 2*time.Second, try.StatusCodeIs(http.StatusNotFound))
c.Assert(err, checker.IsNil)
// Make sure exposed service is ready.
err = try.GetRequest("http://127.0.0.1:8000/service", 3*time.Second, try.StatusCodeIs(http.StatusOK))
c.Assert(err, checker.IsNil)
// Send SIGTERM to Traefik.
proc, err := os.FindProcess(cmd.Process.Pid)
c.Assert(err, checker.IsNil)
err = proc.Signal(syscall.SIGTERM)
c.Assert(err, checker.IsNil)
// Give Traefik time to process the SIGTERM and send a request half-way
// into the request accepting grace period, by which requests should
// still get served.
time.Sleep(5 * time.Second)
resp, err := http.Get("http://127.0.0.1:8000/service")
c.Assert(err, checker.IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, checker.Equals, http.StatusOK)
// Expect Traefik to shut down gracefully once the request accepting grace
// period has elapsed.
waitErr := make(chan error)
go func() {
waitErr <- cmd.Wait()
}()
select {
case err := <-waitErr:
c.Assert(err, checker.IsNil)
case <-time.After(10 * time.Second):
// By now we are ~5 seconds out of the request accepting grace period
// (start + 5 seconds sleep prior to the mid-grace period request +
// 10 seconds timeout = 15 seconds > 10 seconds grace period).
// Something must have gone wrong if we still haven't terminated at
// this point.
c.Fatal("Traefik did not terminate in time")
}
}

View file

@ -0,0 +1,22 @@
defaultEntryPoints = ["http"]
logLevel = "DEBUG"
[entryPoints]
[entryPoints.http]
address = ":8000"
[lifeCycle]
requestAcceptGraceTimeout = "10s"
[file]
[backends]
[backends.backend]
[backends.backend.servers.server]
url = "{{.Server}}"
[frontends]
[frontends.frontend]
backend = "backend"
[frontends.frontend.routes.service]
rule = "Path:/service"

View file

@ -0,0 +1,2 @@
whoami:
image: emilevauge/whoami

View file

@ -203,7 +203,7 @@ func (server *Server) Stop() {
wg.Add(1) wg.Add(1)
go func(serverEntryPointName string, serverEntryPoint *serverEntryPoint) { go func(serverEntryPointName string, serverEntryPoint *serverEntryPoint) {
defer wg.Done() defer wg.Done()
graceTimeOut := time.Duration(server.globalConfiguration.GraceTimeOut) graceTimeOut := time.Duration(server.globalConfiguration.LifeCycle.GraceTimeOut)
ctx, cancel := context.WithTimeout(context.Background(), graceTimeOut) ctx, cancel := context.WithTimeout(context.Background(), graceTimeOut)
log.Debugf("Waiting %s seconds before killing connections on entrypoint %s...", graceTimeOut, serverEntryPointName) log.Debugf("Waiting %s seconds before killing connections on entrypoint %s...", graceTimeOut, serverEntryPointName)
if err := serverEntryPoint.httpServer.Shutdown(ctx); err != nil { if err := serverEntryPoint.httpServer.Shutdown(ctx); err != nil {
@ -220,7 +220,7 @@ func (server *Server) Stop() {
// Close destroys the server // Close destroys the server
func (server *Server) Close() { func (server *Server) Close() {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(server.globalConfiguration.GraceTimeOut)) ctx, cancel := context.WithTimeout(context.Background(), time.Duration(server.globalConfiguration.LifeCycle.GraceTimeOut))
go func(ctx context.Context) { go func(ctx context.Context) {
<-ctx.Done() <-ctx.Done()
if ctx.Err() == context.Canceled { if ctx.Err() == context.Canceled {

View file

@ -5,6 +5,7 @@ package server
import ( import (
"os/signal" "os/signal"
"syscall" "syscall"
"time"
"github.com/containous/traefik/log" "github.com/containous/traefik/log"
) )
@ -31,7 +32,12 @@ func (server *Server) listenSignals() {
} }
default: default:
log.Infof("I have to go... %+v", sig) log.Infof("I have to go... %+v", sig)
log.Info("Stopping server") reqAcceptGraceTimeOut := time.Duration(server.globalConfiguration.LifeCycle.RequestAcceptGraceTimeout)
if reqAcceptGraceTimeOut > 0 {
log.Infof("Waiting %s for incoming requests to cease", reqAcceptGraceTimeOut)
time.Sleep(reqAcceptGraceTimeOut)
}
log.Info("Stopping server gracefully")
server.Stop() server.Stop()
} }
} }