Use long job RetryBackoff in providers

This commit is contained in:
Emile Vauge 2016-08-19 11:09:54 +02:00
parent 7bb5f9a1e4
commit 97ddfcb17a
No known key found for this signature in database
GPG key ID: D808B4C167352E59
6 changed files with 15 additions and 15 deletions

View file

@ -13,6 +13,7 @@ import (
"github.com/cenkalti/backoff" "github.com/cenkalti/backoff"
"github.com/containous/traefik/safe" "github.com/containous/traefik/safe"
"github.com/containous/traefik/types" "github.com/containous/traefik/types"
"github.com/containous/traefik/utils"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
) )
@ -320,7 +321,7 @@ func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMess
worker := func() error { worker := func() error {
return provider.watch(configurationChan, stop) return provider.watch(configurationChan, stop)
} }
err := backoff.RetryNotify(worker, backoff.NewExponentialBackOff(), notify) err := utils.RetryNotifyJob(worker, backoff.NewExponentialBackOff(), notify)
if err != nil { if err != nil {
log.Errorf("Cannot connect to consul server %+v", err) log.Errorf("Cannot connect to consul server %+v", err)
} }

View file

@ -15,6 +15,7 @@ import (
"github.com/cenkalti/backoff" "github.com/cenkalti/backoff"
"github.com/containous/traefik/safe" "github.com/containous/traefik/safe"
"github.com/containous/traefik/types" "github.com/containous/traefik/types"
"github.com/containous/traefik/utils"
"github.com/containous/traefik/version" "github.com/containous/traefik/version"
"github.com/docker/engine-api/client" "github.com/docker/engine-api/client"
dockertypes "github.com/docker/engine-api/types" dockertypes "github.com/docker/engine-api/types"
@ -139,7 +140,7 @@ func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage, po
notify := func(err error, time time.Duration) { notify := func(err error, time time.Duration) {
log.Errorf("Docker connection error %+v, retrying in %s", err, time) log.Errorf("Docker connection error %+v, retrying in %s", err, time)
} }
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify) err := utils.RetryNotifyJob(operation, backoff.NewExponentialBackOff(), notify)
if err != nil { if err != nil {
log.Errorf("Cannot connect to docker server %+v", err) log.Errorf("Cannot connect to docker server %+v", err)
} }

View file

@ -7,7 +7,7 @@ import (
"github.com/containous/traefik/provider/k8s" "github.com/containous/traefik/provider/k8s"
"github.com/containous/traefik/safe" "github.com/containous/traefik/safe"
"github.com/containous/traefik/types" "github.com/containous/traefik/types"
"io" "github.com/containous/traefik/utils"
"io/ioutil" "io/ioutil"
"os" "os"
"reflect" "reflect"
@ -104,7 +104,7 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
for { for {
stopWatch := make(chan bool, 5) stopWatch := make(chan bool, 5)
defer close(stopWatch) defer close(stopWatch)
log.Debugf("Using lable selector: %s", provider.LabelSelector) log.Debugf("Using label selector: '%s'", provider.LabelSelector)
eventsChan, errEventsChan, err := k8sClient.WatchAll(provider.LabelSelector, stopWatch) eventsChan, errEventsChan, err := k8sClient.WatchAll(provider.LabelSelector, stopWatch)
if err != nil { if err != nil {
log.Errorf("Error watching kubernetes events: %v", err) log.Errorf("Error watching kubernetes events: %v", err)
@ -116,18 +116,13 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
return nil return nil
} }
} }
Watch:
for { for {
select { select {
case <-stop: case <-stop:
stopWatch <- true stopWatch <- true
return nil return nil
case err, ok := <-errEventsChan: case err, _ := <-errEventsChan:
stopWatch <- true stopWatch <- true
if ok && strings.Contains(err.Error(), io.EOF.Error()) {
// edge case, kubernetes long-polling disconnection
break Watch
}
return err return err
case event := <-eventsChan: case event := <-eventsChan:
log.Debugf("Received event from kubernetes %+v", event) log.Debugf("Received event from kubernetes %+v", event)
@ -152,7 +147,7 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
notify := func(err error, time time.Duration) { notify := func(err error, time time.Duration) {
log.Errorf("Kubernetes connection error %+v, retrying in %s", err, time) log.Errorf("Kubernetes connection error %+v, retrying in %s", err, time)
} }
err := backoff.RetryNotify(operation, backOff, notify) err := utils.RetryNotifyJob(operation, backOff, notify)
if err != nil { if err != nil {
log.Errorf("Cannot connect to Kubernetes server %+v", err) log.Errorf("Cannot connect to Kubernetes server %+v", err)
} }

View file

@ -13,6 +13,7 @@ import (
"github.com/cenkalti/backoff" "github.com/cenkalti/backoff"
"github.com/containous/traefik/safe" "github.com/containous/traefik/safe"
"github.com/containous/traefik/types" "github.com/containous/traefik/types"
"github.com/containous/traefik/utils"
"github.com/docker/libkv" "github.com/docker/libkv"
"github.com/docker/libkv/store" "github.com/docker/libkv/store"
) )
@ -75,7 +76,7 @@ func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix
notify := func(err error, time time.Duration) { notify := func(err error, time time.Duration) {
log.Errorf("KV connection error: %+v, retrying in %s", err, time) log.Errorf("KV connection error: %+v, retrying in %s", err, time)
} }
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify) err := utils.RetryNotifyJob(operation, backoff.NewExponentialBackOff(), notify)
if err != nil { if err != nil {
return fmt.Errorf("Cannot connect to KV server: %v", err) return fmt.Errorf("Cannot connect to KV server: %v", err)
} }
@ -105,7 +106,7 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *
notify := func(err error, time time.Duration) { notify := func(err error, time time.Duration) {
log.Errorf("KV connection error: %+v, retrying in %s", err, time) log.Errorf("KV connection error: %+v, retrying in %s", err, time)
} }
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify) err := utils.RetryNotifyJob(operation, backoff.NewExponentialBackOff(), notify)
if err != nil { if err != nil {
return fmt.Errorf("Cannot connect to KV server: %v", err) return fmt.Errorf("Cannot connect to KV server: %v", err)
} }

View file

@ -13,6 +13,7 @@ import (
"github.com/cenkalti/backoff" "github.com/cenkalti/backoff"
"github.com/containous/traefik/safe" "github.com/containous/traefik/safe"
"github.com/containous/traefik/types" "github.com/containous/traefik/types"
"github.com/containous/traefik/utils"
"github.com/gambol99/go-marathon" "github.com/gambol99/go-marathon"
"net/http" "net/http"
"time" "time"
@ -108,7 +109,7 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage,
notify := func(err error, time time.Duration) { notify := func(err error, time time.Duration) {
log.Errorf("Marathon connection error %+v, retrying in %s", err, time) log.Errorf("Marathon connection error %+v, retrying in %s", err, time)
} }
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify) err := utils.RetryNotifyJob(operation, backoff.NewExponentialBackOff(), notify)
if err != nil { if err != nil {
log.Errorf("Cannot connect to Marathon server %+v", err) log.Errorf("Cannot connect to Marathon server %+v", err)
} }

View file

@ -12,6 +12,7 @@ import (
"github.com/cenkalti/backoff" "github.com/cenkalti/backoff"
"github.com/containous/traefik/safe" "github.com/containous/traefik/safe"
"github.com/containous/traefik/types" "github.com/containous/traefik/types"
"github.com/containous/traefik/utils"
"github.com/mesos/mesos-go/detector" "github.com/mesos/mesos-go/detector"
_ "github.com/mesos/mesos-go/detector/zoo" // Registers the ZK detector _ "github.com/mesos/mesos-go/detector/zoo" // Registers the ZK detector
"github.com/mesosphere/mesos-dns/detect" "github.com/mesosphere/mesos-dns/detect"
@ -110,7 +111,7 @@ func (provider *Mesos) Provide(configurationChan chan<- types.ConfigMessage, poo
notify := func(err error, time time.Duration) { notify := func(err error, time time.Duration) {
log.Errorf("mesos connection error %+v, retrying in %s", err, time) log.Errorf("mesos connection error %+v, retrying in %s", err, time)
} }
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify) err := utils.RetryNotifyJob(operation, backoff.NewExponentialBackOff(), notify)
if err != nil { if err != nil {
log.Errorf("Cannot connect to mesos server %+v", err) log.Errorf("Cannot connect to mesos server %+v", err)
} }