From c1078c4374bada4a69f1fac823d2c5cde59b6fdf Mon Sep 17 00:00:00 2001 From: Emile Vauge Date: Wed, 13 Apr 2016 20:36:23 +0200 Subject: [PATCH] Fix races Signed-off-by: Emile Vauge --- glide.lock | 32 ++++---- glide.yaml | 1 + middlewares/handlerSwitcher.go | 19 ++--- middlewares/logger.go | 4 +- provider/boltdb.go | 5 +- provider/consul.go | 5 +- provider/consul_catalog.go | 10 ++- provider/docker.go | 4 +- provider/etcd.go | 5 +- provider/file.go | 6 +- provider/kv.go | 17 +++-- provider/kv_test.go | 2 +- provider/marathon.go | 22 +++--- provider/provider.go | 3 +- provider/zk.go | 5 +- safe/routine.go | 70 ++++++++++++++++++ safe/safe.go | 38 +++++----- server.go | 131 +++++++++++++++++++-------------- web.go | 33 ++++++--- 19 files changed, 269 insertions(+), 143 deletions(-) create mode 100644 safe/routine.go diff --git a/glide.lock b/glide.lock index 63a841cf1..9b10d0ddc 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 79b6eb2a613b5e2ce5c57150eec41ac04def3f232a3613fd8b5a88b5e1041b38 -updated: 2016-04-02T15:42:37.505896092+02:00 +hash: fffa87220825895f7e3a6ceed3b13ecbf6bc934ab072fc9be3d00e3eef411ecb +updated: 2016-04-13T14:05:41.300658168+02:00 imports: - name: github.com/alecthomas/template version: b867cc6ab45cece8143cfcc6fc9c77cf3f2c23c0 @@ -42,6 +42,7 @@ imports: version: ff6f38ccb69afa96214c7ee955359465d1fc767a subpackages: - reference + - digest - name: github.com/docker/docker version: f39987afe8d611407887b3094c03d6ba6a766a67 subpackages: @@ -82,18 +83,21 @@ imports: - utils - volume - name: github.com/docker/engine-api - version: 8924d6900370b4c7e7984be5adc61f50a80d7537 + version: 87b3df23dcba0ce02bfe0474e29a08a97f7814e6 subpackages: - client - types - types/container - types/filters - types/strslice + - types/events - client/transport - client/transport/cancellable - types/network + - types/reference - types/registry - types/time + - types/versions - types/blkiodev - name: github.com/docker/go-connections version: f549a9393d05688dff0992ef3efd8bbe6c628aeb @@ -128,7 +132,7 @@ imports: - name: github.com/golang/glog version: fca8c8854093a154ff1eb580aae10276ad6b1b5f - name: github.com/google/go-querystring - version: 6bb77fe6f42b85397288d4f6f67ac72f8f400ee7 + version: 9235644dd9e52eeae6fa48efd539fdc351a0af53 subpackages: - query - name: github.com/gorilla/context @@ -171,9 +175,9 @@ imports: - name: github.com/mailgun/timetools version: fd192d755b00c968d312d23f521eb0cdc6f66bd0 - name: github.com/Microsoft/go-winio - version: 9e2895e5f6c3f16473b91d37fae6e89990a4520c + version: 862b6557927a5c5c81e411c12aa6de7e566cbb7a - name: github.com/miekg/dns - version: 7e024ce8ce18b21b475ac6baf8fa3c42536bf2fa + version: dd83d5cbcfd986f334b2747feeb907e281318fdf - name: github.com/mitchellh/mapstructure version: d2dd0262208475919e1a362f675cfc0e7c10e905 - name: github.com/opencontainers/runc @@ -193,19 +197,21 @@ imports: - name: github.com/spf13/cast version: ee7b3e0353166ab1f3a605294ac8cd2b77953778 - name: github.com/spf13/cobra - version: 2ccf9e982a3e3eb21eba9c9ad8e546529fd74c71 + version: 4c05eb1145f16d0e6bb4a3e1b6d769f4713cb41f subpackages: - cobra - name: github.com/spf13/jwalterweatherman version: 33c24e77fb80341fe7130ee7c594256ff08ccc46 - name: github.com/spf13/pflag - version: 7f60f83a2c81bc3c3c0d5297f61ddfa68da9d3b7 + version: 1f296710f879815ad9e6d39d947c828c3e4b4c3d - name: github.com/spf13/viper version: a212099cbe6fbe8d07476bfda8d2d39b6ff8f325 +- name: github.com/streamrail/concurrent-map + version: 788b276dc7eabf20890ea3fa280956664d58b329 - name: github.com/stretchr/objx version: cbeaeb16a013161a98496fad62933b1d21786672 - name: github.com/stretchr/testify - version: 6fe211e493929a8aac0469b93f28b1d0688a9a3a + version: bcd9e3389dd03b0b668d11f4d462a6af6c2dfd60 subpackages: - mock - assert @@ -214,13 +220,13 @@ imports: - name: github.com/unrolled/render version: 26b4e3aac686940fe29521545afad9966ddfc80c - name: github.com/vdemeester/docker-events - version: bd72e1848b08db4b5ed1a2e9277621b9f5e5d1f3 + version: 6ea3f28df37f29a47498bc8b32b36ad8491dbd37 - name: github.com/vdemeester/libkermit version: 7e4e689a6fa9281e0fb9b7b9c297e22d5342a5ec - name: github.com/vdemeester/shakers version: 24d7f1d6a71aa5d9cbe7390e4afb66b7eef9e1b3 - name: github.com/vulcand/oxy - version: 8aaf36279137ac04ace3792a4f86098631b27d5a + version: 11677428db34c4a05354d66d028174d0e3c6e905 subpackages: - memmetrics - utils @@ -237,11 +243,11 @@ imports: - name: github.com/wendal/errors version: f66c77a7882b399795a8987ebf87ef64a427417e - name: github.com/xenolf/lego - version: ca19a90028e242e878585941c2a27c8f3b3efc25 + version: 23e88185c255e95a106835d80e76e5a3a66d7c54 subpackages: - acme - name: golang.org/x/crypto - version: 9e7f5dc375abeb9619ea3c5c58502c428f457aa2 + version: d68c3ecb62c850b645dc072a8d78006286bf81ca subpackages: - ocsp - name: golang.org/x/net diff --git a/glide.yaml b/glide.yaml index 855a06541..1cd5b5719 100644 --- a/glide.yaml +++ b/glide.yaml @@ -174,3 +174,4 @@ import: - tlsconfig - package: github.com/docker/go-units - package: github.com/mailgun/multibuf + - package: github.com/streamrail/concurrent-map diff --git a/middlewares/handlerSwitcher.go b/middlewares/handlerSwitcher.go index 9865cc0c6..81dfacd58 100644 --- a/middlewares/handlerSwitcher.go +++ b/middlewares/handlerSwitcher.go @@ -1,40 +1,35 @@ package middlewares import ( + "github.com/containous/traefik/safe" "github.com/gorilla/mux" "net/http" - "sync" ) // HandlerSwitcher allows hot switching of http.ServeMux type HandlerSwitcher struct { - handler *mux.Router - handlerLock *sync.Mutex + handler *safe.Safe } // NewHandlerSwitcher builds a new instance of HandlerSwitcher func NewHandlerSwitcher(newHandler *mux.Router) (hs *HandlerSwitcher) { return &HandlerSwitcher{ - handler: newHandler, - handlerLock: &sync.Mutex{}, + handler: safe.New(newHandler), } } func (hs *HandlerSwitcher) ServeHTTP(rw http.ResponseWriter, r *http.Request) { - hs.handlerLock.Lock() - handlerBackup := hs.handler - hs.handlerLock.Unlock() + handlerBackup := hs.handler.Get().(*mux.Router) handlerBackup.ServeHTTP(rw, r) } // GetHandler returns the current http.ServeMux func (hs *HandlerSwitcher) GetHandler() (newHandler *mux.Router) { - return hs.handler + handler := hs.handler.Get().(*mux.Router) + return handler } // UpdateHandler safely updates the current http.ServeMux with a new one func (hs *HandlerSwitcher) UpdateHandler(newHandler *mux.Router) { - hs.handlerLock.Lock() - hs.handler = newHandler - defer hs.handlerLock.Unlock() + hs.handler.Set(newHandler) } diff --git a/middlewares/logger.go b/middlewares/logger.go index f569cfcb5..34baf6f9f 100644 --- a/middlewares/logger.go +++ b/middlewares/logger.go @@ -35,5 +35,7 @@ func (l *Logger) ServeHTTP(rw http.ResponseWriter, r *http.Request, next http.Ha // Close closes the logger (i.e. the file). func (l *Logger) Close() { - l.file.Close() + if l.file != nil { + l.file.Close() + } } diff --git a/provider/boltdb.go b/provider/boltdb.go index 2f2586cd9..0f941627e 100644 --- a/provider/boltdb.go +++ b/provider/boltdb.go @@ -1,6 +1,7 @@ package provider import ( + "github.com/containous/traefik/safe" "github.com/containous/traefik/types" "github.com/docker/libkv/store" "github.com/docker/libkv/store/boltdb" @@ -13,8 +14,8 @@ type BoltDb struct { // Provide allows the provider to provide configurations to traefik // using the given configuration channel. -func (provider *BoltDb) Provide(configurationChan chan<- types.ConfigMessage) error { +func (provider *BoltDb) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { provider.storeType = store.BOLTDB boltdb.Register() - return provider.provide(configurationChan) + return provider.provide(configurationChan, pool) } diff --git a/provider/consul.go b/provider/consul.go index 03ea49d72..a2df6852b 100644 --- a/provider/consul.go +++ b/provider/consul.go @@ -1,6 +1,7 @@ package provider import ( + "github.com/containous/traefik/safe" "github.com/containous/traefik/types" "github.com/docker/libkv/store" "github.com/docker/libkv/store/consul" @@ -13,8 +14,8 @@ type Consul struct { // Provide allows the provider to provide configurations to traefik // using the given configuration channel. -func (provider *Consul) Provide(configurationChan chan<- types.ConfigMessage) error { +func (provider *Consul) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { provider.storeType = store.CONSUL consul.Register() - return provider.provide(configurationChan) + return provider.provide(configurationChan, pool) } diff --git a/provider/consul_catalog.go b/provider/consul_catalog.go index 23c5fe6dd..6f167be87 100644 --- a/provider/consul_catalog.go +++ b/provider/consul_catalog.go @@ -189,7 +189,7 @@ func (provider *ConsulCatalog) getNodes(index map[string][]string) ([]catalogUpd return nodes, nil } -func (provider *ConsulCatalog) watch(configurationChan chan<- types.ConfigMessage) error { +func (provider *ConsulCatalog) watch(configurationChan chan<- types.ConfigMessage, stop chan bool) error { stopCh := make(chan struct{}) serviceCatalog := provider.watchServices(stopCh) @@ -197,6 +197,8 @@ func (provider *ConsulCatalog) watch(configurationChan chan<- types.ConfigMessag for { select { + case <-stop: + return nil case index, ok := <-serviceCatalog: if !ok { return errors.New("Consul service list nil") @@ -217,7 +219,7 @@ func (provider *ConsulCatalog) watch(configurationChan chan<- types.ConfigMessag // Provide allows the provider to provide configurations to traefik // using the given configuration channel. -func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMessage) error { +func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { config := api.DefaultConfig() config.Address = provider.Endpoint client, err := api.NewClient(config) @@ -226,12 +228,12 @@ func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMess } provider.client = client - safe.Go(func() { + pool.Go(func(stop chan bool) { notify := func(err error, time time.Duration) { log.Errorf("Consul connection error %+v, retrying in %s", err, time) } worker := func() error { - return provider.watch(configurationChan) + return provider.watch(configurationChan, stop) } err := backoff.RetryNotify(worker, backoff.NewExponentialBackOff(), notify) if err != nil { diff --git a/provider/docker.go b/provider/docker.go index f18cb80d7..65506a740 100644 --- a/provider/docker.go +++ b/provider/docker.go @@ -79,7 +79,8 @@ func (provider *Docker) createClient() (client.APIClient, error) { // Provide allows the provider to provide configurations to traefik // using the given configuration channel. -func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage) error { +func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { + // TODO register this routine in pool, and watch for stop channel safe.Go(func() { operation := func() error { var err error @@ -127,6 +128,7 @@ func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage) er } eventHandler.Handle("start", startStopHandle) eventHandler.Handle("die", startStopHandle) + errChan := events.MonitorWithHandler(ctx, dockerClient, options, eventHandler) if err := <-errChan; err != nil { return err diff --git a/provider/etcd.go b/provider/etcd.go index 5be24a8d5..3d0b9e428 100644 --- a/provider/etcd.go +++ b/provider/etcd.go @@ -1,6 +1,7 @@ package provider import ( + "github.com/containous/traefik/safe" "github.com/containous/traefik/types" "github.com/docker/libkv/store" "github.com/docker/libkv/store/etcd" @@ -13,8 +14,8 @@ type Etcd struct { // Provide allows the provider to provide configurations to traefik // using the given configuration channel. -func (provider *Etcd) Provide(configurationChan chan<- types.ConfigMessage) error { +func (provider *Etcd) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { provider.storeType = store.ETCD etcd.Register() - return provider.provide(configurationChan) + return provider.provide(configurationChan, pool) } diff --git a/provider/file.go b/provider/file.go index 714eb1901..6b943b199 100644 --- a/provider/file.go +++ b/provider/file.go @@ -19,7 +19,7 @@ type File struct { // Provide allows the provider to provide configurations to traefik // using the given configuration channel. -func (provider *File) Provide(configurationChan chan<- types.ConfigMessage) error { +func (provider *File) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { watcher, err := fsnotify.NewWatcher() if err != nil { log.Error("Error creating file watcher", err) @@ -35,10 +35,12 @@ func (provider *File) Provide(configurationChan chan<- types.ConfigMessage) erro if provider.Watch { // Process events - safe.Go(func() { + pool.Go(func(stop chan bool) { defer watcher.Close() for { select { + case <-stop: + return case event := <-watcher.Events: if strings.Contains(event.Name, file.Name()) { log.Debug("File event:", event) diff --git a/provider/kv.go b/provider/kv.go index b061d6f76..aacddffd8 100644 --- a/provider/kv.go +++ b/provider/kv.go @@ -36,15 +36,17 @@ type KvTLS struct { InsecureSkipVerify bool } -func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix string) { +func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix string, stop chan bool) { for { - chanKeys, err := provider.kvclient.WatchTree(provider.Prefix, make(chan struct{}) /* stop chan */) + events, err := provider.kvclient.WatchTree(provider.Prefix, make(chan struct{}) /* stop chan */) if err != nil { log.Errorf("Failed to WatchTree %s", err) continue } - - for range chanKeys { + select { + case <-stop: + return + case <-events: configuration := provider.loadConfig() if configuration != nil { configurationChan <- types.ConfigMessage{ @@ -53,11 +55,10 @@ func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix } } } - log.Warnf("Intermittent failure to WatchTree KV. Retrying.") } } -func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage) error { +func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { storeConfig := &store.Config{ ConnectionTimeout: 30 * time.Second, Bucket: "traefik", @@ -102,8 +103,8 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage) error } provider.kvclient = kv if provider.Watch { - safe.Go(func() { - provider.watchKv(configurationChan, provider.Prefix) + pool.Go(func(stop chan bool) { + provider.watchKv(configurationChan, provider.Prefix, stop) }) } configuration := provider.loadConfig() diff --git a/provider/kv_test.go b/provider/kv_test.go index 3f4cd4759..965c963ca 100644 --- a/provider/kv_test.go +++ b/provider/kv_test.go @@ -258,7 +258,7 @@ func TestKvWatchTree(t *testing.T) { configChan := make(chan types.ConfigMessage) safe.Go(func() { - provider.watchKv(configChan, "prefix") + provider.watchKv(configChan, "prefix", make(chan bool, 1)) }) select { diff --git a/provider/marathon.go b/provider/marathon.go index e2dfbb3fb..1df13547e 100644 --- a/provider/marathon.go +++ b/provider/marathon.go @@ -40,7 +40,7 @@ type lightMarathonClient interface { // Provide allows the provider to provide configurations to traefik // using the given configuration channel. -func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage) error { +func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { config := marathon.NewDefaultConfig() config.URL = provider.Endpoint config.EventsTransport = marathon.EventsTransportSSE @@ -64,15 +64,19 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage) if err := client.AddEventsListener(update, marathon.EVENTS_APPLICATIONS); err != nil { log.Errorf("Failed to register for events, %s", err) } else { - safe.Go(func() { + pool.Go(func(stop chan bool) { for { - event := <-update - log.Debug("Marathon event receveived", event) - configuration := provider.loadMarathonConfig() - if configuration != nil { - configurationChan <- types.ConfigMessage{ - ProviderName: "marathon", - Configuration: configuration, + select { + case <-stop: + return + case event := <-update: + log.Debug("Marathon event receveived", event) + configuration := provider.loadMarathonConfig() + if configuration != nil { + configurationChan <- types.ConfigMessage{ + ProviderName: "marathon", + Configuration: configuration, + } } } } diff --git a/provider/provider.go b/provider/provider.go index a1c0c660e..3fa7612ec 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -8,6 +8,7 @@ import ( "github.com/BurntSushi/toml" "github.com/containous/traefik/autogen" + "github.com/containous/traefik/safe" "github.com/containous/traefik/types" "unicode" ) @@ -16,7 +17,7 @@ import ( type Provider interface { // Provide allows the provider to provide configurations to traefik // using the given configuration channel. - Provide(configurationChan chan<- types.ConfigMessage) error + Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error } // BaseProvider should be inherited by providers diff --git a/provider/zk.go b/provider/zk.go index 53097b8a4..77b28100f 100644 --- a/provider/zk.go +++ b/provider/zk.go @@ -1,6 +1,7 @@ package provider import ( + "github.com/containous/traefik/safe" "github.com/containous/traefik/types" "github.com/docker/libkv/store" "github.com/docker/libkv/store/zookeeper" @@ -13,8 +14,8 @@ type Zookepper struct { // Provide allows the provider to provide configurations to traefik // using the given configuration channel. -func (provider *Zookepper) Provide(configurationChan chan<- types.ConfigMessage) error { +func (provider *Zookepper) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { provider.storeType = store.ZK zookeeper.Register() - return provider.provide(configurationChan) + return provider.provide(configurationChan, pool) } diff --git a/safe/routine.go b/safe/routine.go new file mode 100644 index 000000000..7fe9498dc --- /dev/null +++ b/safe/routine.go @@ -0,0 +1,70 @@ +package safe + +import ( + "log" + "runtime/debug" + "sync" +) + +type routine struct { + goroutine func(chan bool) + stop chan bool +} + +// Pool creates a pool of go routines +type Pool struct { + routines []routine + waitGroup sync.WaitGroup + lock sync.Mutex +} + +// Go starts a recoverable goroutine, and can be stopped with stop chan +func (p *Pool) Go(goroutine func(stop chan bool)) { + p.lock.Lock() + newRoutine := routine{ + goroutine: goroutine, + stop: make(chan bool, 1), + } + p.routines = append(p.routines, newRoutine) + p.waitGroup.Add(1) + Go(func() { + goroutine(newRoutine.stop) + p.waitGroup.Done() + }) + p.lock.Unlock() +} + +// Stop stops all started routines, waiting for their termination +func (p *Pool) Stop() { + p.lock.Lock() + for _, routine := range p.routines { + routine.stop <- true + } + p.waitGroup.Wait() + for _, routine := range p.routines { + close(routine.stop) + } + p.lock.Unlock() +} + +// Go starts a recoverable goroutine +func Go(goroutine func()) { + GoWithRecover(goroutine, defaultRecoverGoroutine) +} + +// GoWithRecover starts a recoverable goroutine using given customRecover() function +func GoWithRecover(goroutine func(), customRecover func(err interface{})) { + go func() { + defer func() { + if err := recover(); err != nil { + customRecover(err) + } + }() + goroutine() + }() +} + +func defaultRecoverGoroutine(err interface{}) { + log.Println(err) + debug.PrintStack() +} diff --git a/safe/safe.go b/safe/safe.go index 6cace27a5..e171d34e5 100644 --- a/safe/safe.go +++ b/safe/safe.go @@ -1,28 +1,30 @@ package safe import ( - "log" - "runtime/debug" + "sync" ) -// Go starts a recoverable goroutine -func Go(goroutine func()) { - GoWithRecover(goroutine, defaultRecoverGoroutine) +// Safe contains a thread-safe value +type Safe struct { + value interface{} + lock sync.RWMutex } -// GoWithRecover starts a recoverable goroutine using given customRecover() function -func GoWithRecover(goroutine func(), customRecover func(err interface{})) { - go func() { - defer func() { - if err := recover(); err != nil { - customRecover(err) - } - }() - goroutine() - }() +// New create a new Safe instance given a value +func New(value interface{}) *Safe { + return &Safe{value: value, lock: sync.RWMutex{}} } -func defaultRecoverGoroutine(err interface{}) { - log.Println(err) - debug.PrintStack() +// Get returns the value +func (s *Safe) Get() interface{} { + s.lock.RLock() + defer s.lock.RUnlock() + return s.value +} + +// Set sets a new value +func (s *Safe) Set(value interface{}) { + s.lock.Lock() + defer s.lock.Unlock() + s.value = value } diff --git a/server.go b/server.go index a36892b4d..fc948c462 100644 --- a/server.go +++ b/server.go @@ -15,7 +15,6 @@ import ( "regexp" "sort" "strconv" - "sync" "syscall" "time" @@ -33,6 +32,7 @@ import ( "github.com/containous/traefik/types" "github.com/gorilla/mux" "github.com/mailgun/manners" + "github.com/streamrail/concurrent-map" ) var oxyLogger = &OxyLogger{} @@ -45,10 +45,10 @@ type Server struct { signals chan os.Signal stopChan chan bool providers []provider.Provider - serverLock sync.Mutex - currentConfigurations configs + currentConfigurations safe.Safe globalConfiguration GlobalConfiguration loggerMiddleware *middlewares.Logger + routinesPool safe.Pool } type serverEntryPoints map[string]*serverEntryPoint @@ -71,10 +71,11 @@ func NewServer(globalConfiguration GlobalConfiguration) *Server { server.configurationChan = make(chan types.ConfigMessage, 10) server.configurationValidatedChan = make(chan types.ConfigMessage, 10) server.signals = make(chan os.Signal, 1) - server.stopChan = make(chan bool) + server.stopChan = make(chan bool, 1) server.providers = []provider.Provider{} signal.Notify(server.signals, syscall.SIGINT, syscall.SIGTERM) - server.currentConfigurations = make(configs) + currentConfigurations := make(configs) + server.currentConfigurations.Set(currentConfigurations) server.globalConfiguration = globalConfiguration server.loggerMiddleware = middlewares.NewLogger(globalConfiguration.AccessLogsFile) @@ -84,11 +85,11 @@ func NewServer(globalConfiguration GlobalConfiguration) *Server { // Start starts the server and blocks until server is shutted down. func (server *Server) Start() { server.startHTTPServers() - safe.Go(func() { - server.listenProviders() + server.routinesPool.Go(func(stop chan bool) { + server.listenProviders(stop) }) - safe.Go(func() { - server.listenConfigurations() + server.routinesPool.Go(func(stop chan bool) { + server.listenConfigurations(stop) }) server.configureProviders() server.startProviders() @@ -106,6 +107,7 @@ func (server *Server) Stop() { // Close destroys the server func (server *Server) Close() { + server.routinesPool.Stop() close(server.configurationChan) close(server.configurationValidatedChan) close(server.signals) @@ -126,58 +128,79 @@ func (server *Server) startHTTPServers() { } } -func (server *Server) listenProviders() { - lastReceivedConfiguration := time.Unix(0, 0) - lastConfigs := make(map[string]*types.ConfigMessage) +func (server *Server) listenProviders(stop chan bool) { + lastReceivedConfiguration := safe.New(time.Unix(0, 0)) + lastConfigs := cmap.New() for { - configMsg := <-server.configurationChan - jsonConf, _ := json.Marshal(configMsg.Configuration) - log.Debugf("Configuration received from provider %s: %s", configMsg.ProviderName, string(jsonConf)) - lastConfigs[configMsg.ProviderName] = &configMsg - if time.Now().After(lastReceivedConfiguration.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) { - log.Debugf("Last %s config received more than %s, OK", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration) - // last config received more than n s ago - server.configurationValidatedChan <- configMsg - } else { - log.Debugf("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration) - safe.Go(func() { - <-time.After(server.globalConfiguration.ProvidersThrottleDuration) - if time.Now().After(lastReceivedConfiguration.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) { - log.Debugf("Waited for %s config, OK", configMsg.ProviderName) - server.configurationValidatedChan <- *lastConfigs[configMsg.ProviderName] - } - }) + select { + case <-stop: + return + case configMsg, ok := <-server.configurationChan: + if !ok { + return + } + jsonConf, _ := json.Marshal(configMsg.Configuration) + log.Debugf("Configuration received from provider %s: %s", configMsg.ProviderName, string(jsonConf)) + lastConfigs.Set(configMsg.ProviderName, &configMsg) + lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time) + if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) { + log.Debugf("Last %s config received more than %s, OK", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration) + // last config received more than n s ago + server.configurationValidatedChan <- configMsg + } else { + log.Debugf("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration) + server.routinesPool.Go(func(stop chan bool) { + select { + case <-stop: + return + case <-time.After(server.globalConfiguration.ProvidersThrottleDuration): + lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time) + if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) { + log.Debugf("Waited for %s config, OK", configMsg.ProviderName) + if lastConfig, ok := lastConfigs.Get(configMsg.ProviderName); ok { + server.configurationValidatedChan <- *lastConfig.(*types.ConfigMessage) + } + } + } + }) + } + lastReceivedConfiguration.Set(time.Now()) } - lastReceivedConfiguration = time.Now() } } -func (server *Server) listenConfigurations() { +func (server *Server) listenConfigurations(stop chan bool) { for { - configMsg := <-server.configurationValidatedChan - if configMsg.Configuration == nil { - log.Info("Skipping empty Configuration") - } else if reflect.DeepEqual(server.currentConfigurations[configMsg.ProviderName], configMsg.Configuration) { - log.Info("Skipping same configuration") - } else { - // Copy configurations to new map so we don't change current if LoadConfig fails - newConfigurations := make(configs) - for k, v := range server.currentConfigurations { - newConfigurations[k] = v + select { + case <-stop: + return + case configMsg, ok := <-server.configurationValidatedChan: + if !ok { + return } - newConfigurations[configMsg.ProviderName] = configMsg.Configuration - - newServerEntryPoints, err := server.loadConfig(newConfigurations, server.globalConfiguration) - if err == nil { - server.serverLock.Lock() - for newServerEntryPointName, newServerEntryPoint := range newServerEntryPoints { - server.serverEntryPoints[newServerEntryPointName].httpRouter.UpdateHandler(newServerEntryPoint.httpRouter.GetHandler()) - log.Infof("Server configuration reloaded on %s", server.serverEntryPoints[newServerEntryPointName].httpServer.Addr) - } - server.currentConfigurations = newConfigurations - server.serverLock.Unlock() + currentConfigurations := server.currentConfigurations.Get().(configs) + if configMsg.Configuration == nil { + log.Info("Skipping empty Configuration") + } else if reflect.DeepEqual(currentConfigurations[configMsg.ProviderName], configMsg.Configuration) { + log.Info("Skipping same configuration") } else { - log.Error("Error loading new configuration, aborted ", err) + // Copy configurations to new map so we don't change current if LoadConfig fails + newConfigurations := make(configs) + for k, v := range currentConfigurations { + newConfigurations[k] = v + } + newConfigurations[configMsg.ProviderName] = configMsg.Configuration + + newServerEntryPoints, err := server.loadConfig(newConfigurations, server.globalConfiguration) + if err == nil { + for newServerEntryPointName, newServerEntryPoint := range newServerEntryPoints { + server.serverEntryPoints[newServerEntryPointName].httpRouter.UpdateHandler(newServerEntryPoint.httpRouter.GetHandler()) + log.Infof("Server configuration reloaded on %s", server.serverEntryPoints[newServerEntryPointName].httpServer.Addr) + } + server.currentConfigurations.Set(newConfigurations) + } else { + log.Error("Error loading new configuration, aborted ", err) + } } } } @@ -222,7 +245,7 @@ func (server *Server) startProviders() { log.Infof("Starting provider %v %s", reflect.TypeOf(provider), jsonConf) currentProvider := provider safe.Go(func() { - err := currentProvider.Provide(server.configurationChan) + err := currentProvider.Provide(server.configurationChan, &server.routinesPool) if err != nil { log.Errorf("Error starting provider %s", err) } diff --git a/web.go b/web.go index fcca90fd5..445a83bce 100644 --- a/web.go +++ b/web.go @@ -8,6 +8,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/containous/traefik/autogen" + "github.com/containous/traefik/safe" "github.com/containous/traefik/types" "github.com/elazarl/go-bindata-assetfs" "github.com/gorilla/mux" @@ -34,7 +35,7 @@ var ( // Provide allows the provider to provide configurations to traefik // using the given configuration channel. -func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessage) error { +func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { systemRouter := mux.NewRouter() // health route @@ -104,13 +105,15 @@ func (provider *WebProvider) getHealthHandler(response http.ResponseWriter, requ } func (provider *WebProvider) getConfigHandler(response http.ResponseWriter, request *http.Request) { - templatesRenderer.JSON(response, http.StatusOK, provider.server.currentConfigurations) + currentConfigurations := provider.server.currentConfigurations.Get().(configs) + templatesRenderer.JSON(response, http.StatusOK, currentConfigurations) } func (provider *WebProvider) getProviderHandler(response http.ResponseWriter, request *http.Request) { vars := mux.Vars(request) providerID := vars["provider"] - if provider, ok := provider.server.currentConfigurations[providerID]; ok { + currentConfigurations := provider.server.currentConfigurations.Get().(configs) + if provider, ok := currentConfigurations[providerID]; ok { templatesRenderer.JSON(response, http.StatusOK, provider) } else { http.NotFound(response, request) @@ -120,7 +123,8 @@ func (provider *WebProvider) getProviderHandler(response http.ResponseWriter, re func (provider *WebProvider) getBackendsHandler(response http.ResponseWriter, request *http.Request) { vars := mux.Vars(request) providerID := vars["provider"] - if provider, ok := provider.server.currentConfigurations[providerID]; ok { + currentConfigurations := provider.server.currentConfigurations.Get().(configs) + if provider, ok := currentConfigurations[providerID]; ok { templatesRenderer.JSON(response, http.StatusOK, provider.Backends) } else { http.NotFound(response, request) @@ -131,7 +135,8 @@ func (provider *WebProvider) getBackendHandler(response http.ResponseWriter, req vars := mux.Vars(request) providerID := vars["provider"] backendID := vars["backend"] - if provider, ok := provider.server.currentConfigurations[providerID]; ok { + currentConfigurations := provider.server.currentConfigurations.Get().(configs) + if provider, ok := currentConfigurations[providerID]; ok { if backend, ok := provider.Backends[backendID]; ok { templatesRenderer.JSON(response, http.StatusOK, backend) return @@ -144,7 +149,8 @@ func (provider *WebProvider) getServersHandler(response http.ResponseWriter, req vars := mux.Vars(request) providerID := vars["provider"] backendID := vars["backend"] - if provider, ok := provider.server.currentConfigurations[providerID]; ok { + currentConfigurations := provider.server.currentConfigurations.Get().(configs) + if provider, ok := currentConfigurations[providerID]; ok { if backend, ok := provider.Backends[backendID]; ok { templatesRenderer.JSON(response, http.StatusOK, backend.Servers) return @@ -158,7 +164,8 @@ func (provider *WebProvider) getServerHandler(response http.ResponseWriter, requ providerID := vars["provider"] backendID := vars["backend"] serverID := vars["server"] - if provider, ok := provider.server.currentConfigurations[providerID]; ok { + currentConfigurations := provider.server.currentConfigurations.Get().(configs) + if provider, ok := currentConfigurations[providerID]; ok { if backend, ok := provider.Backends[backendID]; ok { if server, ok := backend.Servers[serverID]; ok { templatesRenderer.JSON(response, http.StatusOK, server) @@ -172,7 +179,8 @@ func (provider *WebProvider) getServerHandler(response http.ResponseWriter, requ func (provider *WebProvider) getFrontendsHandler(response http.ResponseWriter, request *http.Request) { vars := mux.Vars(request) providerID := vars["provider"] - if provider, ok := provider.server.currentConfigurations[providerID]; ok { + currentConfigurations := provider.server.currentConfigurations.Get().(configs) + if provider, ok := currentConfigurations[providerID]; ok { templatesRenderer.JSON(response, http.StatusOK, provider.Frontends) } else { http.NotFound(response, request) @@ -183,7 +191,8 @@ func (provider *WebProvider) getFrontendHandler(response http.ResponseWriter, re vars := mux.Vars(request) providerID := vars["provider"] frontendID := vars["frontend"] - if provider, ok := provider.server.currentConfigurations[providerID]; ok { + currentConfigurations := provider.server.currentConfigurations.Get().(configs) + if provider, ok := currentConfigurations[providerID]; ok { if frontend, ok := provider.Frontends[frontendID]; ok { templatesRenderer.JSON(response, http.StatusOK, frontend) return @@ -196,7 +205,8 @@ func (provider *WebProvider) getRoutesHandler(response http.ResponseWriter, requ vars := mux.Vars(request) providerID := vars["provider"] frontendID := vars["frontend"] - if provider, ok := provider.server.currentConfigurations[providerID]; ok { + currentConfigurations := provider.server.currentConfigurations.Get().(configs) + if provider, ok := currentConfigurations[providerID]; ok { if frontend, ok := provider.Frontends[frontendID]; ok { templatesRenderer.JSON(response, http.StatusOK, frontend.Routes) return @@ -210,7 +220,8 @@ func (provider *WebProvider) getRouteHandler(response http.ResponseWriter, reque providerID := vars["provider"] frontendID := vars["frontend"] routeID := vars["route"] - if provider, ok := provider.server.currentConfigurations[providerID]; ok { + currentConfigurations := provider.server.currentConfigurations.Get().(configs) + if provider, ok := currentConfigurations[providerID]; ok { if frontend, ok := provider.Frontends[frontendID]; ok { if route, ok := frontend.Routes[routeID]; ok { templatesRenderer.JSON(response, http.StatusOK, route)