Merge pull request #627 from containous/add-long-job-exponential-backoff
Add long job exponential backoff
This commit is contained in:
commit
95e8f0a31e
10 changed files with 68 additions and 27 deletions
|
@ -56,7 +56,7 @@ func NewLogger(file string) *Logger {
|
||||||
if len(file) > 0 {
|
if len(file) > 0 {
|
||||||
fi, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
fi, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Error opening file", err)
|
log.Error("Error opening file", err)
|
||||||
}
|
}
|
||||||
return &Logger{fi}
|
return &Logger{fi}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"github.com/cenkalti/backoff"
|
"github.com/cenkalti/backoff"
|
||||||
"github.com/containous/traefik/safe"
|
"github.com/containous/traefik/safe"
|
||||||
"github.com/containous/traefik/types"
|
"github.com/containous/traefik/types"
|
||||||
|
"github.com/containous/traefik/utils"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -320,9 +321,9 @@ func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMess
|
||||||
worker := func() error {
|
worker := func() error {
|
||||||
return provider.watch(configurationChan, stop)
|
return provider.watch(configurationChan, stop)
|
||||||
}
|
}
|
||||||
err := backoff.RetryNotify(worker, backoff.NewExponentialBackOff(), notify)
|
err := utils.RetryNotifyJob(worker, backoff.NewExponentialBackOff(), notify)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Cannot connect to consul server %+v", err)
|
log.Errorf("Cannot connect to consul server %+v", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"github.com/cenkalti/backoff"
|
"github.com/cenkalti/backoff"
|
||||||
"github.com/containous/traefik/safe"
|
"github.com/containous/traefik/safe"
|
||||||
"github.com/containous/traefik/types"
|
"github.com/containous/traefik/types"
|
||||||
|
"github.com/containous/traefik/utils"
|
||||||
"github.com/containous/traefik/version"
|
"github.com/containous/traefik/version"
|
||||||
"github.com/docker/engine-api/client"
|
"github.com/docker/engine-api/client"
|
||||||
dockertypes "github.com/docker/engine-api/types"
|
dockertypes "github.com/docker/engine-api/types"
|
||||||
|
@ -139,9 +140,9 @@ func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage, po
|
||||||
notify := func(err error, time time.Duration) {
|
notify := func(err error, time time.Duration) {
|
||||||
log.Errorf("Docker connection error %+v, retrying in %s", err, time)
|
log.Errorf("Docker connection error %+v, retrying in %s", err, time)
|
||||||
}
|
}
|
||||||
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
|
err := utils.RetryNotifyJob(operation, backoff.NewExponentialBackOff(), notify)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Cannot connect to docker server %+v", err)
|
log.Errorf("Cannot connect to docker server %+v", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,7 @@ import (
|
||||||
"github.com/containous/traefik/provider/k8s"
|
"github.com/containous/traefik/provider/k8s"
|
||||||
"github.com/containous/traefik/safe"
|
"github.com/containous/traefik/safe"
|
||||||
"github.com/containous/traefik/types"
|
"github.com/containous/traefik/types"
|
||||||
"io"
|
"github.com/containous/traefik/utils"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
@ -104,7 +104,7 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
|
||||||
for {
|
for {
|
||||||
stopWatch := make(chan bool, 5)
|
stopWatch := make(chan bool, 5)
|
||||||
defer close(stopWatch)
|
defer close(stopWatch)
|
||||||
log.Debugf("Using lable selector: %s", provider.LabelSelector)
|
log.Debugf("Using label selector: '%s'", provider.LabelSelector)
|
||||||
eventsChan, errEventsChan, err := k8sClient.WatchAll(provider.LabelSelector, stopWatch)
|
eventsChan, errEventsChan, err := k8sClient.WatchAll(provider.LabelSelector, stopWatch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error watching kubernetes events: %v", err)
|
log.Errorf("Error watching kubernetes events: %v", err)
|
||||||
|
@ -116,18 +116,13 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Watch:
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-stop:
|
case <-stop:
|
||||||
stopWatch <- true
|
stopWatch <- true
|
||||||
return nil
|
return nil
|
||||||
case err, ok := <-errEventsChan:
|
case err, _ := <-errEventsChan:
|
||||||
stopWatch <- true
|
stopWatch <- true
|
||||||
if ok && strings.Contains(err.Error(), io.EOF.Error()) {
|
|
||||||
// edge case, kubernetes long-polling disconnection
|
|
||||||
break Watch
|
|
||||||
}
|
|
||||||
return err
|
return err
|
||||||
case event := <-eventsChan:
|
case event := <-eventsChan:
|
||||||
log.Debugf("Received event from kubernetes %+v", event)
|
log.Debugf("Received event from kubernetes %+v", event)
|
||||||
|
@ -152,9 +147,9 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
|
||||||
notify := func(err error, time time.Duration) {
|
notify := func(err error, time time.Duration) {
|
||||||
log.Errorf("Kubernetes connection error %+v, retrying in %s", err, time)
|
log.Errorf("Kubernetes connection error %+v, retrying in %s", err, time)
|
||||||
}
|
}
|
||||||
err := backoff.RetryNotify(operation, backOff, notify)
|
err := utils.RetryNotifyJob(operation, backOff, notify)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Cannot connect to Kubernetes server %+v", err)
|
log.Errorf("Cannot connect to Kubernetes server %+v", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"github.com/cenkalti/backoff"
|
"github.com/cenkalti/backoff"
|
||||||
"github.com/containous/traefik/safe"
|
"github.com/containous/traefik/safe"
|
||||||
"github.com/containous/traefik/types"
|
"github.com/containous/traefik/types"
|
||||||
|
"github.com/containous/traefik/utils"
|
||||||
"github.com/docker/libkv"
|
"github.com/docker/libkv"
|
||||||
"github.com/docker/libkv/store"
|
"github.com/docker/libkv/store"
|
||||||
)
|
)
|
||||||
|
@ -75,7 +76,7 @@ func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix
|
||||||
notify := func(err error, time time.Duration) {
|
notify := func(err error, time time.Duration) {
|
||||||
log.Errorf("KV connection error: %+v, retrying in %s", err, time)
|
log.Errorf("KV connection error: %+v, retrying in %s", err, time)
|
||||||
}
|
}
|
||||||
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
|
err := utils.RetryNotifyJob(operation, backoff.NewExponentialBackOff(), notify)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Cannot connect to KV server: %v", err)
|
return fmt.Errorf("Cannot connect to KV server: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -105,7 +106,7 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *
|
||||||
notify := func(err error, time time.Duration) {
|
notify := func(err error, time time.Duration) {
|
||||||
log.Errorf("KV connection error: %+v, retrying in %s", err, time)
|
log.Errorf("KV connection error: %+v, retrying in %s", err, time)
|
||||||
}
|
}
|
||||||
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
|
err := utils.RetryNotifyJob(operation, backoff.NewExponentialBackOff(), notify)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Cannot connect to KV server: %v", err)
|
return fmt.Errorf("Cannot connect to KV server: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"github.com/cenkalti/backoff"
|
"github.com/cenkalti/backoff"
|
||||||
"github.com/containous/traefik/safe"
|
"github.com/containous/traefik/safe"
|
||||||
"github.com/containous/traefik/types"
|
"github.com/containous/traefik/types"
|
||||||
|
"github.com/containous/traefik/utils"
|
||||||
"github.com/gambol99/go-marathon"
|
"github.com/gambol99/go-marathon"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
@ -108,9 +109,9 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage,
|
||||||
notify := func(err error, time time.Duration) {
|
notify := func(err error, time time.Duration) {
|
||||||
log.Errorf("Marathon connection error %+v, retrying in %s", err, time)
|
log.Errorf("Marathon connection error %+v, retrying in %s", err, time)
|
||||||
}
|
}
|
||||||
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
|
err := utils.RetryNotifyJob(operation, backoff.NewExponentialBackOff(), notify)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Cannot connect to Marathon server %+v", err)
|
log.Errorf("Cannot connect to Marathon server %+v", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/cenkalti/backoff"
|
"github.com/cenkalti/backoff"
|
||||||
"github.com/containous/traefik/safe"
|
"github.com/containous/traefik/safe"
|
||||||
"github.com/containous/traefik/types"
|
"github.com/containous/traefik/types"
|
||||||
|
"github.com/containous/traefik/utils"
|
||||||
"github.com/mesos/mesos-go/detector"
|
"github.com/mesos/mesos-go/detector"
|
||||||
_ "github.com/mesos/mesos-go/detector/zoo" // Registers the ZK detector
|
_ "github.com/mesos/mesos-go/detector/zoo" // Registers the ZK detector
|
||||||
"github.com/mesosphere/mesos-dns/detect"
|
"github.com/mesosphere/mesos-dns/detect"
|
||||||
|
@ -110,9 +111,9 @@ func (provider *Mesos) Provide(configurationChan chan<- types.ConfigMessage, poo
|
||||||
notify := func(err error, time time.Duration) {
|
notify := func(err error, time time.Duration) {
|
||||||
log.Errorf("mesos connection error %+v, retrying in %s", err, time)
|
log.Errorf("mesos connection error %+v, retrying in %s", err, time)
|
||||||
}
|
}
|
||||||
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
|
err := utils.RetryNotifyJob(operation, backoff.NewExponentialBackOff(), notify)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Cannot connect to mesos server %+v", err)
|
log.Errorf("Cannot connect to mesos server %+v", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -393,9 +394,9 @@ func detectMasters(zk string, masters []string) <-chan []string {
|
||||||
if zk != "" {
|
if zk != "" {
|
||||||
log.Debugf("Starting master detector for ZK ", zk)
|
log.Debugf("Starting master detector for ZK ", zk)
|
||||||
if md, err := detector.New(zk); err != nil {
|
if md, err := detector.New(zk); err != nil {
|
||||||
log.Fatalf("failed to create master detector: %v", err)
|
log.Errorf("failed to create master detector: %v", err)
|
||||||
} else if err := md.Detect(detect.NewMasters(masters, changed)); err != nil {
|
} else if err := md.Detect(detect.NewMasters(masters, changed)); err != nil {
|
||||||
log.Fatalf("failed to initialize master detector: %v", err)
|
log.Errorf("failed to initialize master detector: %v", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
changed <- masters
|
changed <- masters
|
||||||
|
|
|
@ -413,7 +413,7 @@ func (server *Server) prepareServer(entryPointName string, router *middlewares.H
|
||||||
negroni.UseHandler(router)
|
negroni.UseHandler(router)
|
||||||
tlsConfig, err := server.createTLSConfig(entryPointName, entryPoint.TLS, router)
|
tlsConfig, err := server.createTLSConfig(entryPointName, entryPoint.TLS, router)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Error creating TLS config %s", err)
|
log.Errorf("Error creating TLS config %s", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -431,7 +431,7 @@ func (server *Server) prepareServer(entryPointName string, router *middlewares.H
|
||||||
TLSConfig: tlsConfig,
|
TLSConfig: tlsConfig,
|
||||||
}, tlsConfig)
|
}, tlsConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Error hijacking server %s", err)
|
log.Errorf("Error hijacking server %s", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return gracefulServer, nil
|
return gracefulServer, nil
|
||||||
|
|
|
@ -203,7 +203,7 @@ func run(traefikConfiguration *TraefikConfiguration) {
|
||||||
// logging
|
// logging
|
||||||
level, err := log.ParseLevel(strings.ToLower(globalConfiguration.LogLevel))
|
level, err := log.ParseLevel(strings.ToLower(globalConfiguration.LogLevel))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Error getting level", err)
|
log.Error("Error getting level", err)
|
||||||
}
|
}
|
||||||
log.SetLevel(level)
|
log.SetLevel(level)
|
||||||
if len(globalConfiguration.TraefikLogsFile) > 0 {
|
if len(globalConfiguration.TraefikLogsFile) > 0 {
|
||||||
|
@ -214,7 +214,7 @@ func run(traefikConfiguration *TraefikConfiguration) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("Error opening file", err)
|
log.Error("Error opening file", err)
|
||||||
} else {
|
} else {
|
||||||
log.SetOutput(fi)
|
log.SetOutput(fi)
|
||||||
log.SetFormatter(&log.TextFormatter{DisableColors: true, FullTimestamp: true, DisableSorting: true})
|
log.SetFormatter(&log.TextFormatter{DisableColors: true, FullTimestamp: true, DisableSorting: true})
|
||||||
|
|
41
utils/retry.go
Normal file
41
utils/retry.go
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
package utils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/cenkalti/backoff"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
minLongJobInterval = 30 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// RetryNotifyJob calls notify function with the error and wait duration
|
||||||
|
// for each failed attempt before sleep.
|
||||||
|
func RetryNotifyJob(operation backoff.Operation, b backoff.BackOff, notify backoff.Notify) error {
|
||||||
|
var err error
|
||||||
|
var next time.Duration
|
||||||
|
|
||||||
|
b.Reset()
|
||||||
|
for {
|
||||||
|
before := time.Now()
|
||||||
|
if err = operation(); err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
elapsed := time.Since(before)
|
||||||
|
|
||||||
|
// If long job, we reset the backoff
|
||||||
|
if elapsed >= minLongJobInterval {
|
||||||
|
b.Reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
if next = b.NextBackOff(); next == backoff.Stop {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if notify != nil {
|
||||||
|
notify(err, next)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(next)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue