diff --git a/cluster/datastore.go b/cluster/datastore.go index 5f3896772..5bad330a8 100644 --- a/cluster/datastore.go +++ b/cluster/datastore.go @@ -94,8 +94,8 @@ func (d *Datastore) watchChanges() error { // Begin creates a transaction with the KV store. func (d *Datastore) Begin() (*Transaction, error) { - value := uuid.NewV4().String() - remoteLock, err := d.kv.NewLock(d.lockKey, &store.LockOptions{TTL: 20 * time.Second, Value: []byte(value)}) + id := uuid.NewV4().String() + remoteLock, err := d.kv.NewLock(d.lockKey, &store.LockOptions{TTL: 20 * time.Second, Value: []byte(id)}) if err != nil { return nil, err } @@ -119,8 +119,8 @@ func (d *Datastore) Begin() (*Transaction, error) { // we got the lock! Now make sure we are synced with KV store operation := func() error { meta := d.get() - if meta.Lock != value { - return fmt.Errorf("Object lock value: expected %s, got %s", value, meta.Lock) + if meta.Lock != id { + return fmt.Errorf("Object lock value: expected %s, got %s", id, meta.Lock) } return nil } diff --git a/cluster/leadership.go b/cluster/leadership.go index 851e90010..60d67e427 100644 --- a/cluster/leadership.go +++ b/cluster/leadership.go @@ -6,22 +6,34 @@ import ( "github.com/containous/traefik/safe" "github.com/containous/traefik/types" "github.com/docker/leadership" + "golang.org/x/net/context" "time" ) // Leadership allows leadership election using a KV store type Leadership struct { - types.Cluster + *safe.Pool + *types.Cluster candidate *leadership.Candidate } +// NewLeadership creates a leadership +func NewLeadership(ctx context.Context, cluster *types.Cluster) *Leadership { + return &Leadership{ + Pool: safe.NewPool(ctx), + Cluster: cluster, + candidate: leadership.NewCandidate(cluster.Store, cluster.Store.Prefix+"/leader", cluster.Node, 20*time.Second), + } +} + // Participate tries to be a leader -func (l *Leadership) Participate(pool *safe.Pool, isElected func(bool)) { - pool.Go(func(stop chan bool) { - l.candidate = leadership.NewCandidate(l.Store, l.Store.Prefix+"/leader", l.Node, 30*time.Second) +func (l *Leadership) Participate(pool *safe.Pool) { + pool.GoCtx(func(ctx context.Context) { + log.Debugf("Node %s running for election", l.Cluster.Node) + defer log.Debugf("Node %s no more running for election", l.Cluster.Node) backOff := backoff.NewExponentialBackOff() operation := func() error { - return l.run(l.candidate, stop, isElected) + return l.run(l.candidate, ctx) } notify := func(err error, time time.Duration) { @@ -36,21 +48,31 @@ func (l *Leadership) Participate(pool *safe.Pool, isElected func(bool)) { // Resign resigns from being a leader func (l *Leadership) Resign() { - if l.candidate != nil { - l.candidate.Resign() - } + l.candidate.Resign() + log.Infof("Node %s resined", l.Cluster.Node) } -func (l *Leadership) run(candidate *leadership.Candidate, stop chan bool, isElected func(bool)) error { +func (l *Leadership) run(candidate *leadership.Candidate, ctx context.Context) error { electedCh, errCh := candidate.RunForElection() for { select { case elected := <-electedCh: - isElected(elected) + l.onElection(elected) case err := <-errCh: return err - case <-stop: + case <-ctx.Done(): + l.candidate.Resign() return nil } } } + +func (l *Leadership) onElection(elected bool) { + if elected { + log.Infof("Node %s elected leader ♚", l.Cluster.Node) + l.Start() + } else { + log.Infof("Node %s elected slave ♝", l.Cluster.Node) + l.Stop() + } +} diff --git a/provider/kv.go b/provider/kv.go index bc60f80cd..86c97d1b7 100644 --- a/provider/kv.go +++ b/provider/kv.go @@ -148,7 +148,7 @@ func (provider *Kv) list(keys ...string) []string { joinedKeys := strings.Join(keys, "") keysPairs, err := provider.kvclient.List(joinedKeys) if err != nil { - log.Errorf("Error getting keys %s %s ", joinedKeys, err) + log.Warnf("Cannot get keys %s %s ", joinedKeys, err) return nil } directoryKeys := make(map[string]string) @@ -170,10 +170,10 @@ func (provider *Kv) get(defaultValue string, keys ...string) string { joinedKeys := strings.Join(keys, "") keyPair, err := provider.kvclient.Get(strings.TrimPrefix(joinedKeys, "/")) if err != nil { - log.Warnf("Error getting key %s %s, setting default %s", joinedKeys, err, defaultValue) + log.Warnf("Cannot get key %s %s, setting default %s", joinedKeys, err, defaultValue) return defaultValue } else if keyPair == nil { - log.Warnf("Error getting key %s, setting default %s", joinedKeys, defaultValue) + log.Warnf("Cannot get key %s, setting default %s", joinedKeys, defaultValue) return defaultValue } return string(keyPair.Value) @@ -183,10 +183,10 @@ func (provider *Kv) splitGet(keys ...string) []string { joinedKeys := strings.Join(keys, "") keyPair, err := provider.kvclient.Get(joinedKeys) if err != nil { - log.Warnf("Error getting key %s %s, setting default empty", joinedKeys, err) + log.Warnf("Cannot get key %s %s, setting default empty", joinedKeys, err) return []string{} } else if keyPair == nil { - log.Warnf("Error getting key %s, setting default %empty", joinedKeys) + log.Warnf("Cannot get key %s, setting default %empty", joinedKeys) return []string{} } return strings.Split(string(keyPair.Value), ",") diff --git a/safe/routine.go b/safe/routine.go index 7fe9498dc..75e2358b7 100644 --- a/safe/routine.go +++ b/safe/routine.go @@ -1,6 +1,7 @@ package safe import ( + "golang.org/x/net/context" "log" "runtime/debug" "sync" @@ -11,11 +12,44 @@ type routine struct { stop chan bool } -// Pool creates a pool of go routines +type routineCtx func(ctx context.Context) + +// Pool is a pool of go routines type Pool struct { - routines []routine - waitGroup sync.WaitGroup - lock sync.Mutex + routines []routine + routinesCtx []routineCtx + waitGroup sync.WaitGroup + lock sync.Mutex + baseCtx context.Context + ctx context.Context + cancel context.CancelFunc +} + +// NewPool creates a Pool +func NewPool(baseCtx context.Context) *Pool { + ctx, cancel := context.WithCancel(baseCtx) + return &Pool{ + ctx: ctx, + cancel: cancel, + baseCtx: baseCtx, + } +} + +// Ctx returns main context +func (p *Pool) Ctx() context.Context { + return p.ctx +} + +//GoCtx starts a recoverable goroutine with a context +func (p *Pool) GoCtx(goroutine routineCtx) { + p.lock.Lock() + p.routinesCtx = append(p.routinesCtx, goroutine) + p.waitGroup.Add(1) + Go(func() { + goroutine(p.ctx) + p.waitGroup.Done() + }) + p.lock.Unlock() } // Go starts a recoverable goroutine, and can be stopped with stop chan @@ -37,6 +71,7 @@ func (p *Pool) Go(goroutine func(stop chan bool)) { // Stop stops all started routines, waiting for their termination func (p *Pool) Stop() { p.lock.Lock() + p.cancel() for _, routine := range p.routines { routine.stop <- true } @@ -47,6 +82,29 @@ func (p *Pool) Stop() { p.lock.Unlock() } +// Start starts all stoped routines +func (p *Pool) Start() { + p.lock.Lock() + p.ctx, p.cancel = context.WithCancel(p.baseCtx) + for _, routine := range p.routines { + p.waitGroup.Add(1) + routine.stop = make(chan bool, 1) + Go(func() { + routine.goroutine(routine.stop) + p.waitGroup.Done() + }) + } + + for _, routine := range p.routinesCtx { + p.waitGroup.Add(1) + Go(func() { + routine(p.ctx) + p.waitGroup.Done() + }) + } + p.lock.Unlock() +} + // Go starts a recoverable goroutine func Go(goroutine func()) { GoWithRecover(goroutine, defaultRecoverGoroutine) diff --git a/server.go b/server.go index 48c8c3ffb..a9222bf2a 100644 --- a/server.go +++ b/server.go @@ -24,6 +24,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/codegangsta/negroni" "github.com/containous/mux" + "github.com/containous/traefik/cluster" "github.com/containous/traefik/middlewares" "github.com/containous/traefik/provider" "github.com/containous/traefik/safe" @@ -50,7 +51,8 @@ type Server struct { currentConfigurations safe.Safe globalConfiguration GlobalConfiguration loggerMiddleware *middlewares.Logger - routinesPool safe.Pool + routinesPool *safe.Pool + leadership *cluster.Leadership } type serverEntryPoints map[string]*serverEntryPoint @@ -80,12 +82,18 @@ func NewServer(globalConfiguration GlobalConfiguration) *Server { server.currentConfigurations.Set(currentConfigurations) server.globalConfiguration = globalConfiguration server.loggerMiddleware = middlewares.NewLogger(globalConfiguration.AccessLogsFile) + server.routinesPool = safe.NewPool(context.Background()) + if globalConfiguration.Cluster != nil { + // leadership creation if cluster mode + server.leadership = cluster.NewLeadership(server.routinesPool.Ctx(), globalConfiguration.Cluster) + } return server } // Start starts the server and blocks until server is shutted down. func (server *Server) Start() { + server.startLeadership() server.startHTTPServers() server.routinesPool.Go(func(stop chan bool) { server.listenProviders(stop) @@ -125,6 +133,7 @@ func (server *Server) Close() { os.Exit(1) } }(ctx) + server.stopLeadership() server.routinesPool.Stop() close(server.configurationChan) close(server.configurationValidatedChan) @@ -135,6 +144,23 @@ func (server *Server) Close() { cancel() } +func (server *Server) startLeadership() { + if server.leadership != nil { + server.leadership.Participate(server.routinesPool) + server.leadership.GoCtx(func(ctx context.Context) { + log.Debugf("Started test routine") + <-ctx.Done() + log.Debugf("Stopped test routine") + }) + } +} + +func (server *Server) stopLeadership() { + if server.leadership != nil { + server.leadership.Resign() + } +} + func (server *Server) startHTTPServers() { server.serverEntryPoints = server.buildEntryPoints(server.globalConfiguration) for newServerEntryPointName, newServerEntryPoint := range server.serverEntryPoints { @@ -321,7 +347,7 @@ func (server *Server) startProviders() { log.Infof("Starting provider %v %s", reflect.TypeOf(provider), jsonConf) currentProvider := provider safe.Go(func() { - err := currentProvider.Provide(server.configurationChan, &server.routinesPool, server.globalConfiguration.Constraints) + err := currentProvider.Provide(server.configurationChan, server.routinesPool, server.globalConfiguration.Constraints) if err != nil { log.Errorf("Error starting provider %s", err) } diff --git a/traefik.go b/traefik.go index f29ee8591..e5f65468a 100644 --- a/traefik.go +++ b/traefik.go @@ -21,6 +21,7 @@ import ( "github.com/containous/traefik/types" "github.com/containous/traefik/version" "github.com/docker/libkv/store" + "github.com/satori/go.uuid" ) var versionTemplate = `Version: {{.Version}} @@ -155,8 +156,10 @@ Complete documentation is available at https://traefik.io`, // IF a KV Store is enable and no sub-command called in args if kv != nil && usedCmd == traefikCmd { if traefikConfiguration.Cluster == nil { - hostname, _ := os.Hostname() - traefikConfiguration.Cluster = &types.Cluster{Store: types.Store{Prefix: kv.Prefix, Store: kv.Store}, Node: hostname} + traefikConfiguration.Cluster = &types.Cluster{Node: uuid.NewV4().String()} + } + if traefikConfiguration.Cluster.Store == nil { + traefikConfiguration.Cluster.Store = &types.Store{Prefix: kv.Prefix, Store: kv.Store} } s.AddSource(kv) if _, err := s.LoadConfig(); err != nil { diff --git a/types/types.go b/types/types.go index d24f5a036..e2208180b 100644 --- a/types/types.go +++ b/types/types.go @@ -202,7 +202,7 @@ type Store struct { // Cluster holds cluster config type Cluster struct { Node string - Store Store + Store *Store } // Auth holds authentication configuration (BASIC, DIGEST, users)