b0efd685a9
recent additions to golint mean that a number of files cause the build to start failing if they are edited (we only run against changed files) This fixes all the errors in the repo so things don't unexpectedly start failing for people making PRs
102 lines
2.6 KiB
Go
102 lines
2.6 KiB
Go
package cluster
|
|
|
|
import (
|
|
"github.com/cenk/backoff"
|
|
"github.com/containous/traefik/log"
|
|
"github.com/containous/traefik/safe"
|
|
"github.com/containous/traefik/types"
|
|
"github.com/docker/leadership"
|
|
"golang.org/x/net/context"
|
|
"time"
|
|
)
|
|
|
|
// Leadership allows leadership election using a KV store
|
|
type Leadership struct {
|
|
*safe.Pool
|
|
*types.Cluster
|
|
candidate *leadership.Candidate
|
|
leader safe.Safe
|
|
listeners []LeaderListener
|
|
}
|
|
|
|
// NewLeadership creates a leadership
|
|
func NewLeadership(ctx context.Context, cluster *types.Cluster) *Leadership {
|
|
return &Leadership{
|
|
Pool: safe.NewPool(ctx),
|
|
Cluster: cluster,
|
|
candidate: leadership.NewCandidate(cluster.Store, cluster.Store.Prefix+"/leader", cluster.Node, 20*time.Second),
|
|
listeners: []LeaderListener{},
|
|
}
|
|
}
|
|
|
|
// LeaderListener is called when leadership has changed
|
|
type LeaderListener func(elected bool) error
|
|
|
|
// Participate tries to be a leader
|
|
func (l *Leadership) Participate(pool *safe.Pool) {
|
|
pool.GoCtx(func(ctx context.Context) {
|
|
log.Debugf("Node %s running for election", l.Cluster.Node)
|
|
defer log.Debugf("Node %s no more running for election", l.Cluster.Node)
|
|
backOff := backoff.NewExponentialBackOff()
|
|
operation := func() error {
|
|
return l.run(ctx, l.candidate)
|
|
}
|
|
|
|
notify := func(err error, time time.Duration) {
|
|
log.Errorf("Leadership election error %+v, retrying in %s", err, time)
|
|
}
|
|
err := backoff.RetryNotify(operation, backOff, notify)
|
|
if err != nil {
|
|
log.Errorf("Cannot elect leadership %+v", err)
|
|
}
|
|
})
|
|
}
|
|
|
|
// AddListener adds a leadership listerner
|
|
func (l *Leadership) AddListener(listener LeaderListener) {
|
|
l.listeners = append(l.listeners, listener)
|
|
}
|
|
|
|
// Resign resigns from being a leader
|
|
func (l *Leadership) Resign() {
|
|
l.candidate.Resign()
|
|
log.Infof("Node %s resigned", l.Cluster.Node)
|
|
}
|
|
|
|
func (l *Leadership) run(ctx context.Context, candidate *leadership.Candidate) error {
|
|
electedCh, errCh := candidate.RunForElection()
|
|
for {
|
|
select {
|
|
case elected := <-electedCh:
|
|
l.onElection(elected)
|
|
case err := <-errCh:
|
|
return err
|
|
case <-ctx.Done():
|
|
l.candidate.Resign()
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (l *Leadership) onElection(elected bool) {
|
|
if elected {
|
|
log.Infof("Node %s elected leader ♚", l.Cluster.Node)
|
|
l.leader.Set(true)
|
|
l.Start()
|
|
} else {
|
|
log.Infof("Node %s elected slave ♝", l.Cluster.Node)
|
|
l.leader.Set(false)
|
|
l.Stop()
|
|
}
|
|
for _, listener := range l.listeners {
|
|
err := listener(elected)
|
|
if err != nil {
|
|
log.Errorf("Error calling Leadership listener: %s", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// IsLeader returns true if current node is leader
|
|
func (l *Leadership) IsLeader() bool {
|
|
return l.leader.Get().(bool)
|
|
}
|