From 5a0440d6f81b0ea2a2d93634fe9e7fa36793a256 Mon Sep 17 00:00:00 2001 From: Emile Vauge Date: Tue, 16 Aug 2016 19:13:18 +0200 Subject: [PATCH] Add KV datastore Signed-off-by: Emile Vauge --- acme/acme.go | 81 ++++++++++------ acme/challengeProvider.go | 15 +-- cluster/datastore.go | 184 +++++++++++++++++++++++++++++++++++++ cluster/leadership.go | 56 +++++++++++ configuration.go | 21 +++-- glide.lock | 49 ++++++++++ glide.yaml | 3 + integration/consul_test.go | 94 +++++++++++++++++++ provider/boltdb.go | 2 + provider/consul.go | 2 + provider/consul_catalog.go | 2 + provider/docker.go | 2 + provider/etcd.go | 2 + provider/file.go | 2 + provider/kubernetes.go | 2 + provider/marathon.go | 2 + provider/mesos.go | 2 + provider/zk.go | 2 + server.go | 2 +- traefik.go | 6 +- types/types.go | 13 +++ 21 files changed, 498 insertions(+), 46 deletions(-) create mode 100644 cluster/datastore.go create mode 100644 cluster/leadership.go diff --git a/acme/acme.go b/acme/acme.go index 8763cc235..0de565eb4 100644 --- a/acme/acme.go +++ b/acme/acme.go @@ -173,6 +173,7 @@ type ACME struct { storageLock sync.RWMutex client *acme.Client account *Account + defaultCertificate *tls.Certificate } //Domains parse []Domain @@ -216,28 +217,50 @@ type Domain struct { SANs []string } -// CreateConfig creates a tls.config from using ACME configuration -func (a *ACME) CreateConfig(tlsConfig *tls.Config, CheckOnDemandDomain func(domain string) bool) error { +func (a *ACME) init() error { + if len(a.Store) == 0 { + a.Store = a.StorageFile + } acme.Logger = fmtlog.New(ioutil.Discard, "", 0) - - if len(a.StorageFile) == 0 { - return errors.New("Empty StorageFile, please provide a filename for certs storage") - } - log.Debugf("Generating default certificate...") - if len(tlsConfig.Certificates) == 0 { - // no certificates in TLS config, so we add a default one - cert, err := generateDefaultCertificate() - if err != nil { - return err - } - tlsConfig.Certificates = append(tlsConfig.Certificates, *cert) + // no certificates in TLS config, so we add a default one + cert, err := generateDefaultCertificate() + if err != nil { + return err } + a.defaultCertificate = cert + return nil +} + +// CreateClusterConfig creates a tls.config from using ACME configuration +func (a *ACME) CreateClusterConfig(tlsConfig *tls.Config, CheckOnDemandDomain func(domain string) bool) error { + err := a.init() + if err != nil { + return err + } + if len(a.Store) == 0 { + return errors.New("Empty Store, please provide a filename/key for certs storage") + } + tlsConfig.Certificates = append(tlsConfig.Certificates, *a.defaultCertificate) + return nil +} + +// CreateLocalConfig creates a tls.config from using ACME configuration +func (a *ACME) CreateLocalConfig(tlsConfig *tls.Config, CheckOnDemandDomain func(domain string) bool) error { + err := a.init() + if err != nil { + return err + } + if len(a.Store) == 0 { + return errors.New("Empty Store, please provide a filename/key for certs storage") + } + tlsConfig.Certificates = append(tlsConfig.Certificates, *a.defaultCertificate) + var needRegister bool var err error // if certificates in storage, load them - if fileInfo, err := os.Stat(a.StorageFile); err == nil && fileInfo.Size() != 0 { + if fileInfo, fileErr := os.Stat(a.Store); fileErr == nil && fileInfo.Size() != 0 { log.Infof("Loading ACME certificates...") // load account a.account, err = a.loadAccount(a) @@ -265,7 +288,10 @@ func (a *ACME) CreateConfig(tlsConfig *tls.Config, CheckOnDemandDomain func(doma } a.client.ExcludeChallenges([]acme.Challenge{acme.HTTP01, acme.DNS01}) wrapperChallengeProvider := newWrapperChallengeProvider() - a.client.SetChallengeProvider(acme.TLSSNI01, wrapperChallengeProvider) + err = client.SetChallengeProvider(acme.TLSSNI01, wrapperChallengeProvider) + if err != nil { + return err + } if needRegister { // New users will need to register; be sure to save it @@ -322,12 +348,9 @@ func (a *ACME) CreateConfig(tlsConfig *tls.Config, CheckOnDemandDomain func(doma ticker := time.NewTicker(24 * time.Hour) safe.Go(func() { - for { - select { - case <-ticker.C: - if err := a.renewCertificates(a.client); err != nil { - log.Errorf("Error renewing ACME certificate %+v: %s", a.account, err.Error()) - } + for range ticker.C { + if err := a.renewCertificates(client, account); err != nil { + log.Errorf("Error renewing ACME certificate %+v: %s", account, err.Error()) } } @@ -470,22 +493,22 @@ func (a *ACME) LoadCertificateForDomains(domains []string) { func (a *ACME) loadAccount(acmeConfig *ACME) (*Account, error) { a.storageLock.RLock() defer a.storageLock.RUnlock() - Account := Account{ + account := Account{ DomainsCertificate: DomainsCertificates{}, } - file, err := ioutil.ReadFile(acmeConfig.StorageFile) + file, err := ioutil.ReadFile(acmeConfig.Store) if err != nil { return nil, err } - if err := json.Unmarshal(file, &Account); err != nil { + if err := json.Unmarshal(file, &account); err != nil { return nil, err } - err = Account.DomainsCertificate.init() + err = account.DomainsCertificate.init() if err != nil { return nil, err } - log.Infof("Loaded ACME config from storage %s", acmeConfig.StorageFile) - return &Account, nil + log.Infof("Loaded ACME config from store %s", acmeConfig.Store) + return &account, nil } func (a *ACME) saveAccount() error { @@ -496,7 +519,7 @@ func (a *ACME) saveAccount() error { if err != nil { return err } - return ioutil.WriteFile(a.StorageFile, data, 0600) + return ioutil.WriteFile(a.Store, data, 0600) } func (a *ACME) getDomainsCertificates(client *acme.Client, domains []string) (*Certificate, error) { diff --git a/acme/challengeProvider.go b/acme/challengeProvider.go index d9bccf204..1083b5f83 100644 --- a/acme/challengeProvider.go +++ b/acme/challengeProvider.go @@ -8,18 +8,20 @@ import ( "github.com/xenolf/lego/acme" ) -type wrapperChallengeProvider struct { +var _ acme.ChallengeProvider = (*inMemoryChallengeProvider)(nil) + +type inMemoryChallengeProvider struct { challengeCerts map[string]*tls.Certificate lock sync.RWMutex } -func newWrapperChallengeProvider() *wrapperChallengeProvider { - return &wrapperChallengeProvider{ +func newWrapperChallengeProvider() *inMemoryChallengeProvider { + return &inMemoryChallengeProvider{ challengeCerts: map[string]*tls.Certificate{}, } } -func (c *wrapperChallengeProvider) getCertificate(domain string) (cert *tls.Certificate, exists bool) { +func (c *inMemoryChallengeProvider) getCertificate(domain string) (cert *tls.Certificate, exists bool) { c.lock.RLock() defer c.lock.RUnlock() if cert, ok := c.challengeCerts[domain]; ok { @@ -28,7 +30,7 @@ func (c *wrapperChallengeProvider) getCertificate(domain string) (cert *tls.Cert return nil, false } -func (c *wrapperChallengeProvider) Present(domain, token, keyAuth string) error { +func (c *inMemoryChallengeProvider) Present(domain, token, keyAuth string) error { cert, _, err := acme.TLSSNI01ChallengeCert(keyAuth) if err != nil { return err @@ -45,10 +47,9 @@ func (c *wrapperChallengeProvider) Present(domain, token, keyAuth string) error } return nil - } -func (c *wrapperChallengeProvider) CleanUp(domain, token, keyAuth string) error { +func (c *inMemoryChallengeProvider) CleanUp(domain, token, keyAuth string) error { c.lock.Lock() defer c.lock.Unlock() delete(c.challengeCerts, domain) diff --git a/cluster/datastore.go b/cluster/datastore.go new file mode 100644 index 000000000..5f3896772 --- /dev/null +++ b/cluster/datastore.go @@ -0,0 +1,184 @@ +package cluster + +import ( + "fmt" + log "github.com/Sirupsen/logrus" + "github.com/cenkalti/backoff" + "github.com/containous/staert" + "github.com/docker/libkv/store" + "github.com/satori/go.uuid" + "golang.org/x/net/context" + "sync" + "time" +) + +// Object is the struct to store +type Object interface{} + +// Metadata stores Object plus metadata +type Metadata struct { + Lock string +} + +// Datastore holds a struct synced in a KV store +type Datastore struct { + kv *staert.KvSource + ctx context.Context + localLock *sync.RWMutex + object Object + meta *Metadata + lockKey string +} + +// NewDataStore creates a Datastore +func NewDataStore(kvSource *staert.KvSource, ctx context.Context, object Object) (*Datastore, error) { + datastore := Datastore{ + kv: kvSource, + ctx: ctx, + meta: &Metadata{}, + object: object, + lockKey: kvSource.Prefix + "/lock", + localLock: &sync.RWMutex{}, + } + err := datastore.watchChanges() + if err != nil { + return nil, err + } + return &datastore, nil +} + +func (d *Datastore) watchChanges() error { + stopCh := make(chan struct{}) + kvCh, err := d.kv.Watch(d.lockKey, stopCh) + if err != nil { + return err + } + go func() { + ctx, cancel := context.WithCancel(d.ctx) + operation := func() error { + for { + select { + case <-ctx.Done(): + stopCh <- struct{}{} + return nil + case _, ok := <-kvCh: + if !ok { + cancel() + return err + } + d.localLock.Lock() + err := d.kv.LoadConfig(d.object) + if err != nil { + d.localLock.Unlock() + return err + } + err = d.kv.LoadConfig(d.meta) + if err != nil { + d.localLock.Unlock() + return err + } + d.localLock.Unlock() + } + } + } + notify := func(err error, time time.Duration) { + log.Errorf("Error in watch datastore: %+v, retrying in %s", err, time) + } + err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify) + if err != nil { + log.Errorf("Error in watch datastore: %v", err) + } + }() + return nil +} + +// Begin creates a transaction with the KV store. +func (d *Datastore) Begin() (*Transaction, error) { + value := uuid.NewV4().String() + remoteLock, err := d.kv.NewLock(d.lockKey, &store.LockOptions{TTL: 20 * time.Second, Value: []byte(value)}) + if err != nil { + return nil, err + } + stopCh := make(chan struct{}) + ctx, cancel := context.WithCancel(d.ctx) + var errLock error + go func() { + _, errLock = remoteLock.Lock(stopCh) + cancel() + }() + select { + case <-ctx.Done(): + if errLock != nil { + return nil, errLock + } + case <-d.ctx.Done(): + stopCh <- struct{}{} + return nil, d.ctx.Err() + } + + // we got the lock! Now make sure we are synced with KV store + operation := func() error { + meta := d.get() + if meta.Lock != value { + return fmt.Errorf("Object lock value: expected %s, got %s", value, meta.Lock) + } + return nil + } + notify := func(err error, time time.Duration) { + log.Errorf("Datastore sync error: %v, retrying in %s", err, time) + } + ebo := backoff.NewExponentialBackOff() + ebo.MaxElapsedTime = 60 * time.Second + err = backoff.RetryNotify(operation, ebo, notify) + if err != nil { + return nil, fmt.Errorf("Datastore cannot sync: %v", err) + } + + // we synced with KV store, we can now return Setter + return &Transaction{ + Datastore: d, + remoteLock: remoteLock, + }, nil +} + +func (d *Datastore) get() *Metadata { + d.localLock.RLock() + defer d.localLock.RUnlock() + return d.meta +} + +// Get atomically a struct from the KV store +func (d *Datastore) Get() Object { + d.localLock.RLock() + defer d.localLock.RUnlock() + return d.object +} + +// Transaction allows to set a struct in the KV store +type Transaction struct { + *Datastore + remoteLock store.Locker + dirty bool +} + +// Commit allows to set an object in the KV store +func (s *Transaction) Commit(object Object) error { + s.localLock.Lock() + defer s.localLock.Unlock() + if s.dirty { + return fmt.Errorf("Transaction already used. Please begin a new one.") + } + err := s.kv.StoreConfig(object) + if err != nil { + return err + } + + err = s.remoteLock.Unlock() + if err != nil { + return err + } + + s.Datastore.object = object + s.dirty = true + return nil +} diff --git a/cluster/leadership.go b/cluster/leadership.go new file mode 100644 index 000000000..851e90010 --- /dev/null +++ b/cluster/leadership.go @@ -0,0 +1,56 @@ +package cluster + +import ( + log "github.com/Sirupsen/logrus" + "github.com/cenkalti/backoff" + "github.com/containous/traefik/safe" + "github.com/containous/traefik/types" + "github.com/docker/leadership" + "time" +) + +// Leadership allows leadership election using a KV store +type Leadership struct { + types.Cluster + candidate *leadership.Candidate +} + +// Participate tries to be a leader +func (l *Leadership) Participate(pool *safe.Pool, isElected func(bool)) { + pool.Go(func(stop chan bool) { + l.candidate = leadership.NewCandidate(l.Store, l.Store.Prefix+"/leader", l.Node, 30*time.Second) + backOff := backoff.NewExponentialBackOff() + operation := func() error { + return l.run(l.candidate, stop, isElected) + } + + notify := func(err error, time time.Duration) { + log.Errorf("Leadership election error %+v, retrying in %s", err, time) + } + err := backoff.RetryNotify(operation, backOff, notify) + if err != nil { + log.Errorf("Cannot elect leadership %+v", err) + } + }) +} + +// Resign resigns from being a leader +func (l *Leadership) Resign() { + if l.candidate != nil { + l.candidate.Resign() + } +} + +func (l *Leadership) run(candidate *leadership.Candidate, stop chan bool, isElected func(bool)) error { + electedCh, errCh := candidate.RunForElection() + for { + select { + case elected := <-electedCh: + isElected(elected) + case err := <-errCh: + return err + case <-stop: + return nil + } + } +} diff --git a/configuration.go b/configuration.go index 1f98bce9e..558ce2932 100644 --- a/configuration.go +++ b/configuration.go @@ -23,12 +23,13 @@ type TraefikConfiguration struct { // GlobalConfiguration holds global configuration (with providers, etc.). // It's populated from the traefik configuration file passed as an argument to the binary. type GlobalConfiguration struct { - GraceTimeOut int64 `short:"g" description:"Duration to give active requests a chance to finish during hot-reload"` - Debug bool `short:"d" description:"Enable debug mode"` - AccessLogsFile string `description:"Access logs file"` - TraefikLogsFile string `description:"Traefik logs file"` - LogLevel string `short:"l" description:"Log level"` - EntryPoints EntryPoints `description:"Entrypoints definition using format: --entryPoints='Name:http Address::8000 Redirect.EntryPoint:https' --entryPoints='Name:https Address::4442 TLS:tests/traefik.crt,tests/traefik.key'"` + GraceTimeOut int64 `short:"g" description:"Duration to give active requests a chance to finish during hot-reload"` + Debug bool `short:"d" description:"Enable debug mode"` + AccessLogsFile string `description:"Access logs file"` + TraefikLogsFile string `description:"Traefik logs file"` + LogLevel string `short:"l" description:"Log level"` + EntryPoints EntryPoints `description:"Entrypoints definition using format: --entryPoints='Name:http Address::8000 Redirect.EntryPoint:https' --entryPoints='Name:https Address::4442 TLS:tests/traefik.crt,tests/traefik.key'"` + Cluster *types.Cluster Constraints types.Constraints `description:"Filter services by constraint, matching with service tags."` ACME *acme.ACME `description:"Enable ACME (Let's Encrypt): automatic SSL"` DefaultEntryPoints DefaultEntryPoints `description:"Entrypoints to be used by frontends that do not specify any entrypoint"` @@ -73,7 +74,9 @@ func (dep *DefaultEntryPoints) Set(value string) error { } // Get return the EntryPoints map -func (dep *DefaultEntryPoints) Get() interface{} { return DefaultEntryPoints(*dep) } +func (dep *DefaultEntryPoints) Get() interface{} { + return DefaultEntryPoints(*dep) +} // SetValue sets the EntryPoints map with val func (dep *DefaultEntryPoints) SetValue(val interface{}) { @@ -153,7 +156,9 @@ func (ep *EntryPoints) Set(value string) error { } // Get return the EntryPoints map -func (ep *EntryPoints) Get() interface{} { return EntryPoints(*ep) } +func (ep *EntryPoints) Get() interface{} { + return EntryPoints(*ep) +} // SetValue sets the EntryPoints map with val func (ep *EntryPoints) SetValue(val interface{}) { diff --git a/glide.lock b/glide.lock index 4097907e3..f447712ec 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,14 @@ +<<<<<<< a13549cc28273ba5c15a739fa4aaeb3e0f7216a4 hash: c0ac205a859d78847e21d3cd63f427ffba985755c6ae84373e4a20364ba39b05 +<<<<<<< 38b62d4ae311e2d5247065cbc2c09421a2bb81ab updated: 2016-09-30T10:57:42.336729457+02:00 +======= +updated: 2016-09-28T16:50:04.352639437+01:00 +======= +hash: 809b3fa812ca88940fdc15530804a4bcd881708e4819fed5aa45c42c871ba5cf +updated: 2016-09-20T14:50:04.029710103+02:00 +>>>>>>> Add KV datastore +>>>>>>> Add KV datastore imports: - name: github.com/abbot/go-http-auth version: cb4372376e1e00e9f6ab9ec142e029302c9e7140 @@ -36,7 +45,15 @@ imports: subpackages: - spew - name: github.com/docker/distribution +<<<<<<< 38b62d4ae311e2d5247065cbc2c09421a2bb81ab version: 87917f30529e6a7fca8eaff2932424915fb11225 +======= +<<<<<<< a13549cc28273ba5c15a739fa4aaeb3e0f7216a4 + version: 99cb7c0946d2f5a38015443e515dc916295064d7 +======= + version: 857d0f15c0a4d8037175642e0ca3660829551cb5 +>>>>>>> Add KV datastore +>>>>>>> Add KV datastore subpackages: - context - digest @@ -122,7 +139,13 @@ imports: - sockets - tlsconfig - name: github.com/docker/go-units +<<<<<<< 38b62d4ae311e2d5247065cbc2c09421a2bb81ab version: f2d77a61e3c169b43402a0a1e84f06daf29b8190 +======= + version: f2145db703495b2e525c59662db69a7344b00bb8 +- name: github.com/docker/leadership + version: bfc7753dd48af19513b29deec23c364bf0f274eb +>>>>>>> Add KV datastore - name: github.com/docker/libcompose version: d1876c1d68527a49c0aac22a0b161acc7296b740 subpackages: @@ -141,7 +164,11 @@ imports: - version - yaml - name: github.com/docker/libkv +<<<<<<< 38b62d4ae311e2d5247065cbc2c09421a2bb81ab version: 35d3e2084c650109e7bcc7282655b1bc8ba924ff +======= + version: aabc039ad04deb721e234f99cd1b4aa28ac71a40 +>>>>>>> Add KV datastore subpackages: - store - store/boltdb @@ -157,7 +184,15 @@ imports: - name: github.com/go-check/check version: 4f90aeace3a26ad7021961c297b22c42160c7b25 - name: github.com/gogo/protobuf +<<<<<<< 38b62d4ae311e2d5247065cbc2c09421a2bb81ab version: e33835a643a970c11ac74f6333f5f6866387a101 +======= +<<<<<<< a13549cc28273ba5c15a739fa4aaeb3e0f7216a4 + version: 89f1976ff373a3e549675d2f212c10f98b6c6316 +======= + version: e57a569e1882958f6b188cb42231d6db87701f2a +>>>>>>> Add KV datastore +>>>>>>> Add KV datastore subpackages: - proto - name: github.com/golang/glog @@ -220,7 +255,11 @@ imports: - name: github.com/miekg/dns version: 5d001d020961ae1c184f9f8152fdc73810481677 - name: github.com/mitchellh/mapstructure +<<<<<<< 38b62d4ae311e2d5247065cbc2c09421a2bb81ab version: d2dd0262208475919e1a362f675cfc0e7c10e905 +======= + version: 21a35fb16463dfb7c8eee579c65d995d95e64d1e +>>>>>>> Add KV datastore - name: github.com/moul/http2curl version: b1479103caacaa39319f75e7f57fc545287fca0d - name: github.com/NYTimes/gziphandler @@ -228,7 +267,15 @@ imports: - name: github.com/ogier/pflag version: 45c278ab3607870051a2ea9040bb85fcb8557481 - name: github.com/opencontainers/runc +<<<<<<< 38b62d4ae311e2d5247065cbc2c09421a2bb81ab version: 1a81e9ab1f138c091fe5c86d0883f87716088527 +======= +<<<<<<< a13549cc28273ba5c15a739fa4aaeb3e0f7216a4 + version: d9fec4c63b089ddfc267194ecb6cda58a13f072c +======= + version: ff88baa42fa5b2a1568a3a14665142fb4bdb3a2a +>>>>>>> Add KV datastore +>>>>>>> Add KV datastore subpackages: - libcontainer/user - name: github.com/parnurzeal/gorequest @@ -243,6 +290,8 @@ imports: version: e64db453f3512cade908163702045e0f31137843 subpackages: - zk +- name: github.com/satori/go.uuid + version: 879c5887cd475cd7864858769793b2ceb0d44feb - name: github.com/Sirupsen/logrus version: a283a10442df8dc09befd873fab202bf8a253d6a - name: github.com/streamrail/concurrent-map diff --git a/glide.yaml b/glide.yaml index 7f0c63588..9f1c7b4e6 100644 --- a/glide.yaml +++ b/glide.yaml @@ -99,3 +99,6 @@ import: - package: github.com/miekg/dns version: 5d001d020961ae1c184f9f8152fdc73810481677 - package: github.com/NYTimes/gziphandler +- package: github.com/docker/leadership +- package: github.com/satori/go.uuid + version: ^1.1.0 diff --git a/integration/consul_test.go b/integration/consul_test.go index e51479047..ce60c38f9 100644 --- a/integration/consul_test.go +++ b/integration/consul_test.go @@ -5,18 +5,22 @@ import ( "os/exec" "time" + "github.com/containous/staert" "github.com/docker/libkv" "github.com/docker/libkv/store" "github.com/docker/libkv/store/consul" "github.com/go-check/check" + "golang.org/x/net/context" "errors" + "github.com/containous/traefik/cluster" "github.com/containous/traefik/integration/utils" "github.com/containous/traefik/provider" checker "github.com/vdemeester/shakers" "io/ioutil" "os" "strings" + "sync" ) // Consul test suites (using libcompose) @@ -427,3 +431,93 @@ func (s *ConsulSuite) TestCommandStoreConfig(c *check.C) { } } + +type TestStruct struct { + String string + Int int +} + +func (s *ConsulSuite) TestDatastore(c *check.C) { + s.setupConsul(c) + consulHost := s.composeProject.Container(c, "consul").NetworkSettings.IPAddress + kvSource, err := staert.NewKvSource(store.CONSUL, []string{consulHost + ":8500"}, &store.Config{ + ConnectionTimeout: 10 * time.Second, + }, "traefik") + c.Assert(err, checker.IsNil) + + ctx := context.Background() + datastore1, err := cluster.NewDataStore(kvSource, ctx, &TestStruct{}) + c.Assert(err, checker.IsNil) + datastore2, err := cluster.NewDataStore(kvSource, ctx, &TestStruct{}) + c.Assert(err, checker.IsNil) + + setter1, err := datastore1.Begin() + c.Assert(err, checker.IsNil) + err = setter1.Commit(&TestStruct{ + String: "foo", + Int: 1, + }) + c.Assert(err, checker.IsNil) + time.Sleep(2 * time.Second) + test1 := datastore1.Get().(*TestStruct) + c.Assert(test1.String, checker.Equals, "foo") + + test2 := datastore2.Get().(*TestStruct) + c.Assert(test2.String, checker.Equals, "foo") + + setter2, err := datastore2.Begin() + c.Assert(err, checker.IsNil) + err = setter2.Commit(&TestStruct{ + String: "bar", + Int: 2, + }) + c.Assert(err, checker.IsNil) + time.Sleep(2 * time.Second) + test1 = datastore1.Get().(*TestStruct) + c.Assert(test1.String, checker.Equals, "bar") + + test2 = datastore2.Get().(*TestStruct) + c.Assert(test2.String, checker.Equals, "bar") + + wg := &sync.WaitGroup{} + wg.Add(4) + go func() { + for i := 0; i < 100; i++ { + setter1, err := datastore1.Begin() + c.Assert(err, checker.IsNil) + err = setter1.Commit(&TestStruct{ + String: "datastore1", + Int: i, + }) + c.Assert(err, checker.IsNil) + } + wg.Done() + }() + go func() { + for i := 0; i < 100; i++ { + setter2, err := datastore2.Begin() + c.Assert(err, checker.IsNil) + err = setter2.Commit(&TestStruct{ + String: "datastore2", + Int: i, + }) + c.Assert(err, checker.IsNil) + } + wg.Done() + }() + go func() { + for i := 0; i < 100; i++ { + test1 := datastore1.Get().(*TestStruct) + c.Assert(test1, checker.NotNil) + } + wg.Done() + }() + go func() { + for i := 0; i < 100; i++ { + test2 := datastore2.Get().(*TestStruct) + c.Assert(test2, checker.NotNil) + } + wg.Done() + }() + wg.Wait() +} diff --git a/provider/boltdb.go b/provider/boltdb.go index 5220fca98..3347fe340 100644 --- a/provider/boltdb.go +++ b/provider/boltdb.go @@ -8,6 +8,8 @@ import ( "github.com/docker/libkv/store/boltdb" ) +var _ Provider = (*BoltDb)(nil) + // BoltDb holds configurations of the BoltDb provider. type BoltDb struct { Kv `mapstructure:",squash"` diff --git a/provider/consul.go b/provider/consul.go index 4cc8b2851..1b628dd4f 100644 --- a/provider/consul.go +++ b/provider/consul.go @@ -8,6 +8,8 @@ import ( "github.com/docker/libkv/store/consul" ) +var _ Provider = (*Consul)(nil) + // Consul holds configurations of the Consul provider. type Consul struct { Kv `mapstructure:",squash"` diff --git a/provider/consul_catalog.go b/provider/consul_catalog.go index 328a686df..1d1d4c928 100644 --- a/provider/consul_catalog.go +++ b/provider/consul_catalog.go @@ -24,6 +24,8 @@ const ( DefaultConsulCatalogTagPrefix = "traefik" ) +var _ Provider = (*ConsulCatalog)(nil) + // ConsulCatalog holds configurations of the Consul catalog provider. type ConsulCatalog struct { BaseProvider `mapstructure:",squash"` diff --git a/provider/docker.go b/provider/docker.go index f816b3130..73f897244 100644 --- a/provider/docker.go +++ b/provider/docker.go @@ -40,6 +40,8 @@ const ( SwarmDefaultWatchTime = 15 * time.Second ) +var _ Provider = (*Docker)(nil) + // Docker holds configurations of the Docker provider. type Docker struct { BaseProvider `mapstructure:",squash"` diff --git a/provider/etcd.go b/provider/etcd.go index 0165bf3c1..9c7726baa 100644 --- a/provider/etcd.go +++ b/provider/etcd.go @@ -8,6 +8,8 @@ import ( "github.com/docker/libkv/store/etcd" ) +var _ Provider = (*Etcd)(nil) + // Etcd holds configurations of the Etcd provider. type Etcd struct { Kv `mapstructure:",squash"` diff --git a/provider/file.go b/provider/file.go index 05cb89eee..dc42b26d5 100644 --- a/provider/file.go +++ b/provider/file.go @@ -12,6 +12,8 @@ import ( "gopkg.in/fsnotify.v1" ) +var _ Provider = (*File)(nil) + // File holds configurations of the File provider. type File struct { BaseProvider `mapstructure:",squash"` diff --git a/provider/kubernetes.go b/provider/kubernetes.go index e1509ac4a..a5c8654f6 100644 --- a/provider/kubernetes.go +++ b/provider/kubernetes.go @@ -50,6 +50,8 @@ func (ns *Namespaces) SetValue(val interface{}) { *ns = Namespaces(val.(Namespaces)) } +var _ Provider = (*Kubernetes)(nil) + // Kubernetes holds configurations of the Kubernetes provider. type Kubernetes struct { BaseProvider `mapstructure:",squash"` diff --git a/provider/marathon.go b/provider/marathon.go index 10b99f999..c06b49309 100644 --- a/provider/marathon.go +++ b/provider/marathon.go @@ -21,6 +21,8 @@ import ( "github.com/gambol99/go-marathon" ) +var _ Provider = (*Marathon)(nil) + // Marathon holds configuration of the Marathon provider. type Marathon struct { BaseProvider diff --git a/provider/mesos.go b/provider/mesos.go index 623fe1acc..2396cf018 100644 --- a/provider/mesos.go +++ b/provider/mesos.go @@ -24,6 +24,8 @@ import ( "time" ) +var _ Provider = (*Mesos)(nil) + //Mesos holds configuration of the mesos provider. type Mesos struct { BaseProvider diff --git a/provider/zk.go b/provider/zk.go index 467f59e2f..19f3654e5 100644 --- a/provider/zk.go +++ b/provider/zk.go @@ -8,6 +8,8 @@ import ( "github.com/docker/libkv/store/zookeeper" ) +var _ Provider = (*Zookepper)(nil) + // Zookepper holds configurations of the Zookepper provider. type Zookepper struct { Kv `mapstructure:",squash"` diff --git a/server.go b/server.go index 9587ba872..48c8c3ffb 100644 --- a/server.go +++ b/server.go @@ -375,7 +375,7 @@ func (server *Server) createTLSConfig(entryPointName string, tlsOption *TLS, rou } return false } - err := server.globalConfiguration.ACME.CreateConfig(config, checkOnDemandDomain) + err := server.globalConfiguration.ACME.CreateLocalConfig(config, checkOnDemandDomain) if err != nil { return nil, err } diff --git a/traefik.go b/traefik.go index e20e11456..f29ee8591 100644 --- a/traefik.go +++ b/traefik.go @@ -154,6 +154,10 @@ Complete documentation is available at https://traefik.io`, // IF a KV Store is enable and no sub-command called in args if kv != nil && usedCmd == traefikCmd { + if traefikConfiguration.Cluster == nil { + hostname, _ := os.Hostname() + traefikConfiguration.Cluster = &types.Cluster{Store: types.Store{Prefix: kv.Prefix, Store: kv.Store}, Node: hostname} + } s.AddSource(kv) if _, err := s.LoadConfig(); err != nil { fmtlog.Println(err) @@ -235,7 +239,7 @@ func run(traefikConfiguration *TraefikConfiguration) { } // CreateKvSource creates KvSource -// TLS support is enable for Consul and ects backends +// TLS support is enable for Consul and Etcd backends func CreateKvSource(traefikConfiguration *TraefikConfiguration) (*staert.KvSource, error) { var kv *staert.KvSource var store store.Store diff --git a/types/types.go b/types/types.go index d5a0f0ead..d24f5a036 100644 --- a/types/types.go +++ b/types/types.go @@ -3,6 +3,7 @@ package types import ( "errors" "fmt" + "github.com/docker/libkv/store" "github.com/ryanuber/go-glob" "strings" ) @@ -192,6 +193,18 @@ func (cs *Constraints) Type() string { return fmt.Sprint("constraint") } +// Store holds KV store cluster config +type Store struct { + store.Store + Prefix string // like this "prefix" (without the /) +} + +// Cluster holds cluster config +type Cluster struct { + Node string + Store Store +} + // Auth holds authentication configuration (BASIC, DIGEST, users) type Auth struct { Basic *Basic