Merge pull request #70 from vdemeester/carry-pr-48
Carry Add backend throttle duration #48
This commit is contained in:
commit
86f95924a9
5 changed files with 65 additions and 18 deletions
|
@ -3,6 +3,7 @@ package main
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type GlobalConfiguration struct {
|
type GlobalConfiguration struct {
|
||||||
|
@ -12,6 +13,7 @@ type GlobalConfiguration struct {
|
||||||
TraefikLogsFile string
|
TraefikLogsFile string
|
||||||
CertFile, KeyFile string
|
CertFile, KeyFile string
|
||||||
LogLevel string
|
LogLevel string
|
||||||
|
ProvidersThrottleDuration time.Duration
|
||||||
Docker *DockerProvider
|
Docker *DockerProvider
|
||||||
File *FileProvider
|
File *FileProvider
|
||||||
Web *WebProvider
|
Web *WebProvider
|
||||||
|
@ -28,6 +30,7 @@ func NewGlobalConfiguration() *GlobalConfiguration {
|
||||||
globalConfiguration.Port = ":80"
|
globalConfiguration.Port = ":80"
|
||||||
globalConfiguration.GraceTimeOut = 10
|
globalConfiguration.GraceTimeOut = 10
|
||||||
globalConfiguration.LogLevel = "ERROR"
|
globalConfiguration.LogLevel = "ERROR"
|
||||||
|
globalConfiguration.ProvidersThrottleDuration = time.Duration(2 * time.Second)
|
||||||
|
|
||||||
return globalConfiguration
|
return globalConfiguration
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,6 +107,16 @@ For example:
|
||||||
#
|
#
|
||||||
# CertFile = "traefik.crt"
|
# CertFile = "traefik.crt"
|
||||||
# KeyFile = "traefik.key"
|
# KeyFile = "traefik.key"
|
||||||
|
|
||||||
|
# Backends throttle duration: minimum duration between 2 events from providers
|
||||||
|
# before applying a new configuration. It avoids unnecessary reloads if multiples events
|
||||||
|
# are sent in a short amount of time.
|
||||||
|
#
|
||||||
|
# Optional
|
||||||
|
# Default: "2s"
|
||||||
|
#
|
||||||
|
# ProvidersThrottleDuration = "5s"
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *MarathonSuite) TestSimpleConfiguration(c *check.C) {
|
func (s *MarathonSuite) TestSimpleConfiguration(c *check.C) {
|
||||||
cmd := exec.Command(traefikBinary, "fixtures/consul/simple.toml")
|
cmd := exec.Command(traefikBinary, "fixtures/marathon/simple.toml")
|
||||||
err := cmd.Start()
|
err := cmd.Start()
|
||||||
c.Assert(err, checker.IsNil)
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
@ -18,7 +18,7 @@ func (s *MarathonSuite) TestSimpleConfiguration(c *check.C) {
|
||||||
// TODO validate : run on 80
|
// TODO validate : run on 80
|
||||||
resp, err := http.Get("http://127.0.0.1/")
|
resp, err := http.Get("http://127.0.0.1/")
|
||||||
|
|
||||||
// Expected a 404 as we did not comfigure anything
|
// Expected a 404 as we did not configure anything
|
||||||
c.Assert(err, checker.IsNil)
|
c.Assert(err, checker.IsNil)
|
||||||
c.Assert(resp.StatusCode, checker.Equals, 404)
|
c.Assert(resp.StatusCode, checker.Equals, 404)
|
||||||
|
|
||||||
|
|
28
traefik.go
28
traefik.go
|
@ -59,6 +59,8 @@ func main() {
|
||||||
var configurationRouter *mux.Router
|
var configurationRouter *mux.Router
|
||||||
var configurationChan = make(chan configMessage, 10)
|
var configurationChan = make(chan configMessage, 10)
|
||||||
defer close(configurationChan)
|
defer close(configurationChan)
|
||||||
|
var configurationChanValidated = make(chan configMessage, 10)
|
||||||
|
defer close(configurationChanValidated)
|
||||||
var sigs = make(chan os.Signal, 1)
|
var sigs = make(chan os.Signal, 1)
|
||||||
defer close(sigs)
|
defer close(sigs)
|
||||||
var stopChan = make(chan bool)
|
var stopChan = make(chan bool)
|
||||||
|
@ -96,10 +98,32 @@ func main() {
|
||||||
|
|
||||||
// listen new configurations from providers
|
// listen new configurations from providers
|
||||||
go func() {
|
go func() {
|
||||||
|
lastReceivedConfiguration := time.Unix(0, 0)
|
||||||
|
lastConfigs := make(map[string]*configMessage)
|
||||||
for {
|
for {
|
||||||
configMsg := <-configurationChan
|
configMsg := <-configurationChan
|
||||||
log.Infof("Configuration receveived from provider %s: %#v", configMsg.providerName, configMsg.configuration)
|
log.Infof("Configuration receveived from provider %s: %#v", configMsg.providerName, configMsg.configuration)
|
||||||
|
lastConfigs[configMsg.providerName] = &configMsg
|
||||||
|
if time.Now().After(lastReceivedConfiguration.Add(time.Duration(globalConfiguration.ProvidersThrottleDuration))) {
|
||||||
|
log.Infof("Last %s config received more than %s, OK", configMsg.providerName, globalConfiguration.ProvidersThrottleDuration)
|
||||||
|
// last config received more than n s ago
|
||||||
|
configurationChanValidated <- configMsg
|
||||||
|
} else {
|
||||||
|
log.Infof("Last %s config received less than %s, waiting...", configMsg.providerName, globalConfiguration.ProvidersThrottleDuration)
|
||||||
|
go func() {
|
||||||
|
<-time.After(globalConfiguration.ProvidersThrottleDuration)
|
||||||
|
if time.Now().After(lastReceivedConfiguration.Add(time.Duration(globalConfiguration.ProvidersThrottleDuration))) {
|
||||||
|
log.Infof("Waited for %s config, OK", configMsg.providerName)
|
||||||
|
configurationChanValidated <- *lastConfigs[configMsg.providerName]
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
lastReceivedConfiguration = time.Now()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
configMsg := <-configurationChanValidated
|
||||||
if configMsg.configuration == nil {
|
if configMsg.configuration == nil {
|
||||||
log.Info("Skipping empty configuration")
|
log.Info("Skipping empty configuration")
|
||||||
} else if reflect.DeepEqual(currentConfigurations[configMsg.providerName], configMsg.configuration) {
|
} else if reflect.DeepEqual(currentConfigurations[configMsg.providerName], configMsg.configuration) {
|
||||||
|
@ -120,7 +144,7 @@ func main() {
|
||||||
newsrv := prepareServer(configurationRouter, globalConfiguration, oldServer, loggerMiddleware, metrics)
|
newsrv := prepareServer(configurationRouter, globalConfiguration, oldServer, loggerMiddleware, metrics)
|
||||||
go startServer(newsrv, globalConfiguration)
|
go startServer(newsrv, globalConfiguration)
|
||||||
srv = newsrv
|
srv = newsrv
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
if oldServer != nil {
|
if oldServer != nil {
|
||||||
log.Info("Stopping old server")
|
log.Info("Stopping old server")
|
||||||
oldServer.Close()
|
oldServer.Close()
|
||||||
|
|
|
@ -44,6 +44,16 @@
|
||||||
# CertFile = "traefik.crt"
|
# CertFile = "traefik.crt"
|
||||||
# KeyFile = "traefik.key"
|
# KeyFile = "traefik.key"
|
||||||
|
|
||||||
|
# Backends throttle duration: minimum duration between 2 events from providers
|
||||||
|
# before applying a new configuration. It avoids unnecessary reloads if multiples events
|
||||||
|
# are sent in a short amount of time.
|
||||||
|
#
|
||||||
|
# Optional
|
||||||
|
# Default: "2s"
|
||||||
|
#
|
||||||
|
# ProvidersThrottleDuration = "5s"
|
||||||
|
|
||||||
|
|
||||||
################################################################
|
################################################################
|
||||||
# Web configuration backend
|
# Web configuration backend
|
||||||
################################################################
|
################################################################
|
||||||
|
|
Loading…
Reference in a new issue