Merge pull request #305 from containous/fix-races

Fix races
This commit is contained in:
Vincent Demeester 2016-04-15 18:09:50 +02:00
commit 5b2c355c38
22 changed files with 275 additions and 147 deletions

View file

@ -23,9 +23,9 @@ func (oxylogger *OxyLogger) Warningf(format string, args ...interface{}) {
log.Warningf(format, args...)
}
// Errorf logs specified string as Error level in logrus.
// Errorf logs specified string as Warningf level in logrus.
func (oxylogger *OxyLogger) Errorf(format string, args ...interface{}) {
log.Errorf(format, args...)
log.Warningf(format, args...)
}
func notFoundHandler(w http.ResponseWriter, r *http.Request) {

View file

@ -207,6 +207,7 @@ Traefik is obviously slower than Nginx, but not so much: Traefik can serve 28392
Not bad for young project :) !
Some areas of possible improvements:
- Use [GO_REUSEPORT](https://github.com/kavu/go_reuseport) listener
- Run a separate server instance per CPU core with `GOMAXPROCS=1` (it appears during benchmarks that there is a lot more context switches with traefik than with nginx)

View file

@ -518,7 +518,7 @@ Labels can be used on containers to override default behaviour:
- `traefik.protocol=https`: override the default `http` protocol
- `traefik.weight=10`: assign this weight to the container
- `traefik.enable=false`: disable this container in Træfɪk
- `traefik.frontend.rule=Host:test.traefik.io`: override the default frontend rule (Default: `Host:{containerName}.{domain}`). See [frontends](#frontends).
- `traefik.frontend.rule=Host:test.traefik.io`: override the default frontend rule (Default: `Host:{containerName}.{domain}`).
- `traefik.frontend.passHostHeader=true`: forward client `Host` header to the backend.
- `traefik.frontend.entryPoints=http,https`: assign this frontend to entry points `http` and `https`. Overrides `defaultEntryPoints`.
* `traefik.domain=traefik.localhost`: override the default domain
@ -598,7 +598,7 @@ Labels can be used on containers to override default behaviour:
- `traefik.protocol=https`: override the default `http` protocol
- `traefik.weight=10`: assign this weight to the application
- `traefik.enable=false`: disable this application in Træfɪk
- `traefik.frontend.rule=Host:test.traefik.io`: override the default frontend rule (Default: `Host:{containerName}.{domain}`). See [frontends](#frontends).
- `traefik.frontend.rule=Host:test.traefik.io`: override the default frontend rule (Default: `Host:{containerName}.{domain}`).
- `traefik.frontend.passHostHeader=true`: forward client `Host` header to the backend.
- `traefik.frontend.entryPoints=http,https`: assign this frontend to entry points `http` and `https`. Overrides `defaultEntryPoints`.
* `traefik.domain=traefik.localhost`: override the default domain
@ -693,12 +693,13 @@ This backend will create routes matching on hostname based on the service name
used in consul.
Additional settings can be defined using Consul Catalog tags:
- ```traefik.enable=false```: disable this container in Træfɪk
- ```traefik.protocol=https```: override the default `http` protocol
- ```traefik.backend.weight=10```: assign this weight to the container
- ```traefik.backend.circuitbreaker=NetworkErrorRatio() > 0.5```
- ```traefik.backend.loadbalancer=drr```: override the default load balancing mode
- ```traefik.frontend.rule=Host:test.traefik.io```: override the default frontend rule (Default: `Host:{containerName}.{domain}`). See [frontends](#frontends).
- ```traefik.frontend.rule=Host:test.traefik.io```: override the default frontend rule (Default: `Host:{containerName}.{domain}`).
- ```traefik.frontend.passHostHeader=true```: forward client `Host` header to the backend.
- ```traefik.frontend.entryPoints=http,https```: assign this frontend to entry points `http` and `https`. Overrides `defaultEntryPoints`.

30
glide.lock generated
View file

@ -1,5 +1,5 @@
hash: 79b6eb2a613b5e2ce5c57150eec41ac04def3f232a3613fd8b5a88b5e1041b38
updated: 2016-04-02T15:42:37.505896092+02:00
hash: fffa87220825895f7e3a6ceed3b13ecbf6bc934ab072fc9be3d00e3eef411ecb
updated: 2016-04-13T14:05:41.300658168+02:00
imports:
- name: github.com/alecthomas/template
version: b867cc6ab45cece8143cfcc6fc9c77cf3f2c23c0
@ -42,6 +42,7 @@ imports:
version: ff6f38ccb69afa96214c7ee955359465d1fc767a
subpackages:
- reference
- digest
- name: github.com/docker/docker
version: f39987afe8d611407887b3094c03d6ba6a766a67
subpackages:
@ -89,11 +90,14 @@ imports:
- types/container
- types/filters
- types/strslice
- types/events
- client/transport
- client/transport/cancellable
- types/network
- types/reference
- types/registry
- types/time
- types/versions
- types/blkiodev
- name: github.com/docker/go-connections
version: f549a9393d05688dff0992ef3efd8bbe6c628aeb
@ -128,7 +132,7 @@ imports:
- name: github.com/golang/glog
version: fca8c8854093a154ff1eb580aae10276ad6b1b5f
- name: github.com/google/go-querystring
version: 6bb77fe6f42b85397288d4f6f67ac72f8f400ee7
version: 9235644dd9e52eeae6fa48efd539fdc351a0af53
subpackages:
- query
- name: github.com/gorilla/context
@ -171,9 +175,9 @@ imports:
- name: github.com/mailgun/timetools
version: fd192d755b00c968d312d23f521eb0cdc6f66bd0
- name: github.com/Microsoft/go-winio
version: 9e2895e5f6c3f16473b91d37fae6e89990a4520c
version: 862b6557927a5c5c81e411c12aa6de7e566cbb7a
- name: github.com/miekg/dns
version: 7e024ce8ce18b21b475ac6baf8fa3c42536bf2fa
version: dd83d5cbcfd986f334b2747feeb907e281318fdf
- name: github.com/mitchellh/mapstructure
version: d2dd0262208475919e1a362f675cfc0e7c10e905
- name: github.com/opencontainers/runc
@ -193,19 +197,21 @@ imports:
- name: github.com/spf13/cast
version: ee7b3e0353166ab1f3a605294ac8cd2b77953778
- name: github.com/spf13/cobra
version: 2ccf9e982a3e3eb21eba9c9ad8e546529fd74c71
version: 4c05eb1145f16d0e6bb4a3e1b6d769f4713cb41f
subpackages:
- cobra
- name: github.com/spf13/jwalterweatherman
version: 33c24e77fb80341fe7130ee7c594256ff08ccc46
- name: github.com/spf13/pflag
version: 7f60f83a2c81bc3c3c0d5297f61ddfa68da9d3b7
version: 1f296710f879815ad9e6d39d947c828c3e4b4c3d
- name: github.com/spf13/viper
version: a212099cbe6fbe8d07476bfda8d2d39b6ff8f325
- name: github.com/streamrail/concurrent-map
version: 788b276dc7eabf20890ea3fa280956664d58b329
- name: github.com/stretchr/objx
version: cbeaeb16a013161a98496fad62933b1d21786672
- name: github.com/stretchr/testify
version: 6fe211e493929a8aac0469b93f28b1d0688a9a3a
version: bcd9e3389dd03b0b668d11f4d462a6af6c2dfd60
subpackages:
- mock
- assert
@ -214,13 +220,13 @@ imports:
- name: github.com/unrolled/render
version: 26b4e3aac686940fe29521545afad9966ddfc80c
- name: github.com/vdemeester/docker-events
version: bd72e1848b08db4b5ed1a2e9277621b9f5e5d1f3
version: 6ea3f28df37f29a47498bc8b32b36ad8491dbd37
- name: github.com/vdemeester/libkermit
version: 7e4e689a6fa9281e0fb9b7b9c297e22d5342a5ec
- name: github.com/vdemeester/shakers
version: 24d7f1d6a71aa5d9cbe7390e4afb66b7eef9e1b3
- name: github.com/vulcand/oxy
version: 8aaf36279137ac04ace3792a4f86098631b27d5a
version: 11677428db34c4a05354d66d028174d0e3c6e905
subpackages:
- memmetrics
- utils
@ -237,11 +243,11 @@ imports:
- name: github.com/wendal/errors
version: f66c77a7882b399795a8987ebf87ef64a427417e
- name: github.com/xenolf/lego
version: ca19a90028e242e878585941c2a27c8f3b3efc25
version: 23e88185c255e95a106835d80e76e5a3a66d7c54
subpackages:
- acme
- name: golang.org/x/crypto
version: 9e7f5dc375abeb9619ea3c5c58502c428f457aa2
version: d68c3ecb62c850b645dc072a8d78006286bf81ca
subpackages:
- ocsp
- name: golang.org/x/net

View file

@ -174,3 +174,4 @@ import:
- tlsconfig
- package: github.com/docker/go-units
- package: github.com/mailgun/multibuf
- package: github.com/streamrail/concurrent-map

View file

@ -1,40 +1,35 @@
package middlewares
import (
"github.com/containous/traefik/safe"
"github.com/gorilla/mux"
"net/http"
"sync"
)
// HandlerSwitcher allows hot switching of http.ServeMux
type HandlerSwitcher struct {
handler *mux.Router
handlerLock *sync.Mutex
handler *safe.Safe
}
// NewHandlerSwitcher builds a new instance of HandlerSwitcher
func NewHandlerSwitcher(newHandler *mux.Router) (hs *HandlerSwitcher) {
return &HandlerSwitcher{
handler: newHandler,
handlerLock: &sync.Mutex{},
handler: safe.New(newHandler),
}
}
func (hs *HandlerSwitcher) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
hs.handlerLock.Lock()
handlerBackup := hs.handler
hs.handlerLock.Unlock()
handlerBackup := hs.handler.Get().(*mux.Router)
handlerBackup.ServeHTTP(rw, r)
}
// GetHandler returns the current http.ServeMux
func (hs *HandlerSwitcher) GetHandler() (newHandler *mux.Router) {
return hs.handler
handler := hs.handler.Get().(*mux.Router)
return handler
}
// UpdateHandler safely updates the current http.ServeMux with a new one
func (hs *HandlerSwitcher) UpdateHandler(newHandler *mux.Router) {
hs.handlerLock.Lock()
hs.handler = newHandler
defer hs.handlerLock.Unlock()
hs.handler.Set(newHandler)
}

View file

@ -35,5 +35,7 @@ func (l *Logger) ServeHTTP(rw http.ResponseWriter, r *http.Request, next http.Ha
// Close closes the logger (i.e. the file).
func (l *Logger) Close() {
if l.file != nil {
l.file.Close()
}
}

View file

@ -1,6 +1,7 @@
package provider
import (
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/boltdb"
@ -13,8 +14,8 @@ type BoltDb struct {
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *BoltDb) Provide(configurationChan chan<- types.ConfigMessage) error {
func (provider *BoltDb) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
provider.storeType = store.BOLTDB
boltdb.Register()
return provider.provide(configurationChan)
return provider.provide(configurationChan, pool)
}

View file

@ -1,6 +1,7 @@
package provider
import (
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
@ -13,8 +14,8 @@ type Consul struct {
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *Consul) Provide(configurationChan chan<- types.ConfigMessage) error {
func (provider *Consul) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
provider.storeType = store.CONSUL
consul.Register()
return provider.provide(configurationChan)
return provider.provide(configurationChan, pool)
}

View file

@ -189,7 +189,7 @@ func (provider *ConsulCatalog) getNodes(index map[string][]string) ([]catalogUpd
return nodes, nil
}
func (provider *ConsulCatalog) watch(configurationChan chan<- types.ConfigMessage) error {
func (provider *ConsulCatalog) watch(configurationChan chan<- types.ConfigMessage, stop chan bool) error {
stopCh := make(chan struct{})
serviceCatalog := provider.watchServices(stopCh)
@ -197,6 +197,8 @@ func (provider *ConsulCatalog) watch(configurationChan chan<- types.ConfigMessag
for {
select {
case <-stop:
return nil
case index, ok := <-serviceCatalog:
if !ok {
return errors.New("Consul service list nil")
@ -217,7 +219,7 @@ func (provider *ConsulCatalog) watch(configurationChan chan<- types.ConfigMessag
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMessage) error {
func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
config := api.DefaultConfig()
config.Address = provider.Endpoint
client, err := api.NewClient(config)
@ -226,12 +228,12 @@ func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMess
}
provider.client = client
safe.Go(func() {
pool.Go(func(stop chan bool) {
notify := func(err error, time time.Duration) {
log.Errorf("Consul connection error %+v, retrying in %s", err, time)
}
worker := func() error {
return provider.watch(configurationChan)
return provider.watch(configurationChan, stop)
}
err := backoff.RetryNotify(worker, backoff.NewExponentialBackOff(), notify)
if err != nil {

View file

@ -79,7 +79,8 @@ func (provider *Docker) createClient() (client.APIClient, error) {
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage) error {
func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
// TODO register this routine in pool, and watch for stop channel
safe.Go(func() {
operation := func() error {
var err error
@ -127,6 +128,7 @@ func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage) er
}
eventHandler.Handle("start", startStopHandle)
eventHandler.Handle("die", startStopHandle)
errChan := events.MonitorWithHandler(ctx, dockerClient, options, eventHandler)
if err := <-errChan; err != nil {
return err

View file

@ -1,6 +1,7 @@
package provider
import (
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/etcd"
@ -13,8 +14,8 @@ type Etcd struct {
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *Etcd) Provide(configurationChan chan<- types.ConfigMessage) error {
func (provider *Etcd) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
provider.storeType = store.ETCD
etcd.Register()
return provider.provide(configurationChan)
return provider.provide(configurationChan, pool)
}

View file

@ -19,7 +19,7 @@ type File struct {
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *File) Provide(configurationChan chan<- types.ConfigMessage) error {
func (provider *File) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Error("Error creating file watcher", err)
@ -35,10 +35,12 @@ func (provider *File) Provide(configurationChan chan<- types.ConfigMessage) erro
if provider.Watch {
// Process events
safe.Go(func() {
pool.Go(func(stop chan bool) {
defer watcher.Close()
for {
select {
case <-stop:
return
case event := <-watcher.Events:
if strings.Contains(event.Name, file.Name()) {
log.Debug("File event:", event)

View file

@ -36,15 +36,17 @@ type KvTLS struct {
InsecureSkipVerify bool
}
func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix string) {
func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix string, stop chan bool) {
for {
chanKeys, err := provider.kvclient.WatchTree(provider.Prefix, make(chan struct{}) /* stop chan */)
events, err := provider.kvclient.WatchTree(provider.Prefix, make(chan struct{}) /* stop chan */)
if err != nil {
log.Errorf("Failed to WatchTree %s", err)
continue
}
for range chanKeys {
select {
case <-stop:
return
case <-events:
configuration := provider.loadConfig()
if configuration != nil {
configurationChan <- types.ConfigMessage{
@ -53,11 +55,10 @@ func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix
}
}
}
log.Warnf("Intermittent failure to WatchTree KV. Retrying.")
}
}
func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage) error {
func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
storeConfig := &store.Config{
ConnectionTimeout: 30 * time.Second,
Bucket: "traefik",
@ -102,8 +103,8 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage) error
}
provider.kvclient = kv
if provider.Watch {
safe.Go(func() {
provider.watchKv(configurationChan, provider.Prefix)
pool.Go(func(stop chan bool) {
provider.watchKv(configurationChan, provider.Prefix, stop)
})
}
configuration := provider.loadConfig()

View file

@ -258,7 +258,7 @@ func TestKvWatchTree(t *testing.T) {
configChan := make(chan types.ConfigMessage)
safe.Go(func() {
provider.watchKv(configChan, "prefix")
provider.watchKv(configChan, "prefix", make(chan bool, 1))
})
select {

View file

@ -40,7 +40,7 @@ type lightMarathonClient interface {
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage) error {
func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
config := marathon.NewDefaultConfig()
config.URL = provider.Endpoint
config.EventsTransport = marathon.EventsTransportSSE
@ -64,9 +64,12 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage)
if err := client.AddEventsListener(update, marathon.EVENTS_APPLICATIONS); err != nil {
log.Errorf("Failed to register for events, %s", err)
} else {
safe.Go(func() {
pool.Go(func(stop chan bool) {
for {
event := <-update
select {
case <-stop:
return
case event := <-update:
log.Debug("Marathon event receveived", event)
configuration := provider.loadMarathonConfig()
if configuration != nil {
@ -76,6 +79,7 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage)
}
}
}
}
})
}
}

View file

@ -8,6 +8,7 @@ import (
"github.com/BurntSushi/toml"
"github.com/containous/traefik/autogen"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"unicode"
)
@ -16,7 +17,7 @@ import (
type Provider interface {
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
Provide(configurationChan chan<- types.ConfigMessage) error
Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error
}
// BaseProvider should be inherited by providers

View file

@ -1,6 +1,7 @@
package provider
import (
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/zookeeper"
@ -13,8 +14,8 @@ type Zookepper struct {
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *Zookepper) Provide(configurationChan chan<- types.ConfigMessage) error {
func (provider *Zookepper) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
provider.storeType = store.ZK
zookeeper.Register()
return provider.provide(configurationChan)
return provider.provide(configurationChan, pool)
}

70
safe/routine.go Normal file
View file

@ -0,0 +1,70 @@
package safe
import (
"log"
"runtime/debug"
"sync"
)
type routine struct {
goroutine func(chan bool)
stop chan bool
}
// Pool creates a pool of go routines
type Pool struct {
routines []routine
waitGroup sync.WaitGroup
lock sync.Mutex
}
// Go starts a recoverable goroutine, and can be stopped with stop chan
func (p *Pool) Go(goroutine func(stop chan bool)) {
p.lock.Lock()
newRoutine := routine{
goroutine: goroutine,
stop: make(chan bool, 1),
}
p.routines = append(p.routines, newRoutine)
p.waitGroup.Add(1)
Go(func() {
goroutine(newRoutine.stop)
p.waitGroup.Done()
})
p.lock.Unlock()
}
// Stop stops all started routines, waiting for their termination
func (p *Pool) Stop() {
p.lock.Lock()
for _, routine := range p.routines {
routine.stop <- true
}
p.waitGroup.Wait()
for _, routine := range p.routines {
close(routine.stop)
}
p.lock.Unlock()
}
// Go starts a recoverable goroutine
func Go(goroutine func()) {
GoWithRecover(goroutine, defaultRecoverGoroutine)
}
// GoWithRecover starts a recoverable goroutine using given customRecover() function
func GoWithRecover(goroutine func(), customRecover func(err interface{})) {
go func() {
defer func() {
if err := recover(); err != nil {
customRecover(err)
}
}()
goroutine()
}()
}
func defaultRecoverGoroutine(err interface{}) {
log.Println(err)
debug.PrintStack()
}

View file

@ -1,28 +1,30 @@
package safe
import (
"log"
"runtime/debug"
"sync"
)
// Go starts a recoverable goroutine
func Go(goroutine func()) {
GoWithRecover(goroutine, defaultRecoverGoroutine)
// Safe contains a thread-safe value
type Safe struct {
value interface{}
lock sync.RWMutex
}
// GoWithRecover starts a recoverable goroutine using given customRecover() function
func GoWithRecover(goroutine func(), customRecover func(err interface{})) {
go func() {
defer func() {
if err := recover(); err != nil {
customRecover(err)
}
}()
goroutine()
}()
// New create a new Safe instance given a value
func New(value interface{}) *Safe {
return &Safe{value: value, lock: sync.RWMutex{}}
}
func defaultRecoverGoroutine(err interface{}) {
log.Println(err)
debug.PrintStack()
// Get returns the value
func (s *Safe) Get() interface{} {
s.lock.RLock()
defer s.lock.RUnlock()
return s.value
}
// Set sets a new value
func (s *Safe) Set(value interface{}) {
s.lock.Lock()
defer s.lock.Unlock()
s.value = value
}

View file

@ -15,7 +15,6 @@ import (
"regexp"
"sort"
"strconv"
"sync"
"syscall"
"time"
@ -33,6 +32,7 @@ import (
"github.com/containous/traefik/types"
"github.com/gorilla/mux"
"github.com/mailgun/manners"
"github.com/streamrail/concurrent-map"
)
var oxyLogger = &OxyLogger{}
@ -45,10 +45,10 @@ type Server struct {
signals chan os.Signal
stopChan chan bool
providers []provider.Provider
serverLock sync.Mutex
currentConfigurations configs
currentConfigurations safe.Safe
globalConfiguration GlobalConfiguration
loggerMiddleware *middlewares.Logger
routinesPool safe.Pool
}
type serverEntryPoints map[string]*serverEntryPoint
@ -71,10 +71,11 @@ func NewServer(globalConfiguration GlobalConfiguration) *Server {
server.configurationChan = make(chan types.ConfigMessage, 10)
server.configurationValidatedChan = make(chan types.ConfigMessage, 10)
server.signals = make(chan os.Signal, 1)
server.stopChan = make(chan bool)
server.stopChan = make(chan bool, 1)
server.providers = []provider.Provider{}
signal.Notify(server.signals, syscall.SIGINT, syscall.SIGTERM)
server.currentConfigurations = make(configs)
currentConfigurations := make(configs)
server.currentConfigurations.Set(currentConfigurations)
server.globalConfiguration = globalConfiguration
server.loggerMiddleware = middlewares.NewLogger(globalConfiguration.AccessLogsFile)
@ -84,11 +85,11 @@ func NewServer(globalConfiguration GlobalConfiguration) *Server {
// Start starts the server and blocks until server is shutted down.
func (server *Server) Start() {
server.startHTTPServers()
safe.Go(func() {
server.listenProviders()
server.routinesPool.Go(func(stop chan bool) {
server.listenProviders(stop)
})
safe.Go(func() {
server.listenConfigurations()
server.routinesPool.Go(func(stop chan bool) {
server.listenConfigurations(stop)
})
server.configureProviders()
server.startProviders()
@ -106,6 +107,7 @@ func (server *Server) Stop() {
// Close destroys the server
func (server *Server) Close() {
server.routinesPool.Stop()
close(server.configurationChan)
close(server.configurationValidatedChan)
close(server.signals)
@ -126,61 +128,82 @@ func (server *Server) startHTTPServers() {
}
}
func (server *Server) listenProviders() {
lastReceivedConfiguration := time.Unix(0, 0)
lastConfigs := make(map[string]*types.ConfigMessage)
func (server *Server) listenProviders(stop chan bool) {
lastReceivedConfiguration := safe.New(time.Unix(0, 0))
lastConfigs := cmap.New()
for {
configMsg := <-server.configurationChan
select {
case <-stop:
return
case configMsg, ok := <-server.configurationChan:
if !ok {
return
}
jsonConf, _ := json.Marshal(configMsg.Configuration)
log.Debugf("Configuration received from provider %s: %s", configMsg.ProviderName, string(jsonConf))
lastConfigs[configMsg.ProviderName] = &configMsg
if time.Now().After(lastReceivedConfiguration.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
lastConfigs.Set(configMsg.ProviderName, &configMsg)
lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time)
if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
log.Debugf("Last %s config received more than %s, OK", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration)
// last config received more than n s ago
server.configurationValidatedChan <- configMsg
} else {
log.Debugf("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration)
safe.Go(func() {
<-time.After(server.globalConfiguration.ProvidersThrottleDuration)
if time.Now().After(lastReceivedConfiguration.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
server.routinesPool.Go(func(stop chan bool) {
select {
case <-stop:
return
case <-time.After(server.globalConfiguration.ProvidersThrottleDuration):
lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time)
if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
log.Debugf("Waited for %s config, OK", configMsg.ProviderName)
server.configurationValidatedChan <- *lastConfigs[configMsg.ProviderName]
if lastConfig, ok := lastConfigs.Get(configMsg.ProviderName); ok {
server.configurationValidatedChan <- *lastConfig.(*types.ConfigMessage)
}
}
}
})
}
lastReceivedConfiguration = time.Now()
lastReceivedConfiguration.Set(time.Now())
}
}
}
func (server *Server) listenConfigurations() {
func (server *Server) listenConfigurations(stop chan bool) {
for {
configMsg := <-server.configurationValidatedChan
select {
case <-stop:
return
case configMsg, ok := <-server.configurationValidatedChan:
if !ok {
return
}
currentConfigurations := server.currentConfigurations.Get().(configs)
if configMsg.Configuration == nil {
log.Info("Skipping empty Configuration")
} else if reflect.DeepEqual(server.currentConfigurations[configMsg.ProviderName], configMsg.Configuration) {
} else if reflect.DeepEqual(currentConfigurations[configMsg.ProviderName], configMsg.Configuration) {
log.Info("Skipping same configuration")
} else {
// Copy configurations to new map so we don't change current if LoadConfig fails
newConfigurations := make(configs)
for k, v := range server.currentConfigurations {
for k, v := range currentConfigurations {
newConfigurations[k] = v
}
newConfigurations[configMsg.ProviderName] = configMsg.Configuration
newServerEntryPoints, err := server.loadConfig(newConfigurations, server.globalConfiguration)
if err == nil {
server.serverLock.Lock()
for newServerEntryPointName, newServerEntryPoint := range newServerEntryPoints {
server.serverEntryPoints[newServerEntryPointName].httpRouter.UpdateHandler(newServerEntryPoint.httpRouter.GetHandler())
log.Infof("Server configuration reloaded on %s", server.serverEntryPoints[newServerEntryPointName].httpServer.Addr)
}
server.currentConfigurations = newConfigurations
server.serverLock.Unlock()
server.currentConfigurations.Set(newConfigurations)
} else {
log.Error("Error loading new configuration, aborted ", err)
}
}
}
}
}
func (server *Server) configureProviders() {
@ -222,7 +245,7 @@ func (server *Server) startProviders() {
log.Infof("Starting provider %v %s", reflect.TypeOf(provider), jsonConf)
currentProvider := provider
safe.Go(func() {
err := currentProvider.Provide(server.configurationChan)
err := currentProvider.Provide(server.configurationChan, &server.routinesPool)
if err != nil {
log.Errorf("Error starting provider %s", err)
}

33
web.go
View file

@ -8,6 +8,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/containous/traefik/autogen"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/elazarl/go-bindata-assetfs"
"github.com/gorilla/mux"
@ -34,7 +35,7 @@ var (
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessage) error {
func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
systemRouter := mux.NewRouter()
// health route
@ -104,13 +105,15 @@ func (provider *WebProvider) getHealthHandler(response http.ResponseWriter, requ
}
func (provider *WebProvider) getConfigHandler(response http.ResponseWriter, request *http.Request) {
templatesRenderer.JSON(response, http.StatusOK, provider.server.currentConfigurations)
currentConfigurations := provider.server.currentConfigurations.Get().(configs)
templatesRenderer.JSON(response, http.StatusOK, currentConfigurations)
}
func (provider *WebProvider) getProviderHandler(response http.ResponseWriter, request *http.Request) {
vars := mux.Vars(request)
providerID := vars["provider"]
if provider, ok := provider.server.currentConfigurations[providerID]; ok {
currentConfigurations := provider.server.currentConfigurations.Get().(configs)
if provider, ok := currentConfigurations[providerID]; ok {
templatesRenderer.JSON(response, http.StatusOK, provider)
} else {
http.NotFound(response, request)
@ -120,7 +123,8 @@ func (provider *WebProvider) getProviderHandler(response http.ResponseWriter, re
func (provider *WebProvider) getBackendsHandler(response http.ResponseWriter, request *http.Request) {
vars := mux.Vars(request)
providerID := vars["provider"]
if provider, ok := provider.server.currentConfigurations[providerID]; ok {
currentConfigurations := provider.server.currentConfigurations.Get().(configs)
if provider, ok := currentConfigurations[providerID]; ok {
templatesRenderer.JSON(response, http.StatusOK, provider.Backends)
} else {
http.NotFound(response, request)
@ -131,7 +135,8 @@ func (provider *WebProvider) getBackendHandler(response http.ResponseWriter, req
vars := mux.Vars(request)
providerID := vars["provider"]
backendID := vars["backend"]
if provider, ok := provider.server.currentConfigurations[providerID]; ok {
currentConfigurations := provider.server.currentConfigurations.Get().(configs)
if provider, ok := currentConfigurations[providerID]; ok {
if backend, ok := provider.Backends[backendID]; ok {
templatesRenderer.JSON(response, http.StatusOK, backend)
return
@ -144,7 +149,8 @@ func (provider *WebProvider) getServersHandler(response http.ResponseWriter, req
vars := mux.Vars(request)
providerID := vars["provider"]
backendID := vars["backend"]
if provider, ok := provider.server.currentConfigurations[providerID]; ok {
currentConfigurations := provider.server.currentConfigurations.Get().(configs)
if provider, ok := currentConfigurations[providerID]; ok {
if backend, ok := provider.Backends[backendID]; ok {
templatesRenderer.JSON(response, http.StatusOK, backend.Servers)
return
@ -158,7 +164,8 @@ func (provider *WebProvider) getServerHandler(response http.ResponseWriter, requ
providerID := vars["provider"]
backendID := vars["backend"]
serverID := vars["server"]
if provider, ok := provider.server.currentConfigurations[providerID]; ok {
currentConfigurations := provider.server.currentConfigurations.Get().(configs)
if provider, ok := currentConfigurations[providerID]; ok {
if backend, ok := provider.Backends[backendID]; ok {
if server, ok := backend.Servers[serverID]; ok {
templatesRenderer.JSON(response, http.StatusOK, server)
@ -172,7 +179,8 @@ func (provider *WebProvider) getServerHandler(response http.ResponseWriter, requ
func (provider *WebProvider) getFrontendsHandler(response http.ResponseWriter, request *http.Request) {
vars := mux.Vars(request)
providerID := vars["provider"]
if provider, ok := provider.server.currentConfigurations[providerID]; ok {
currentConfigurations := provider.server.currentConfigurations.Get().(configs)
if provider, ok := currentConfigurations[providerID]; ok {
templatesRenderer.JSON(response, http.StatusOK, provider.Frontends)
} else {
http.NotFound(response, request)
@ -183,7 +191,8 @@ func (provider *WebProvider) getFrontendHandler(response http.ResponseWriter, re
vars := mux.Vars(request)
providerID := vars["provider"]
frontendID := vars["frontend"]
if provider, ok := provider.server.currentConfigurations[providerID]; ok {
currentConfigurations := provider.server.currentConfigurations.Get().(configs)
if provider, ok := currentConfigurations[providerID]; ok {
if frontend, ok := provider.Frontends[frontendID]; ok {
templatesRenderer.JSON(response, http.StatusOK, frontend)
return
@ -196,7 +205,8 @@ func (provider *WebProvider) getRoutesHandler(response http.ResponseWriter, requ
vars := mux.Vars(request)
providerID := vars["provider"]
frontendID := vars["frontend"]
if provider, ok := provider.server.currentConfigurations[providerID]; ok {
currentConfigurations := provider.server.currentConfigurations.Get().(configs)
if provider, ok := currentConfigurations[providerID]; ok {
if frontend, ok := provider.Frontends[frontendID]; ok {
templatesRenderer.JSON(response, http.StatusOK, frontend.Routes)
return
@ -210,7 +220,8 @@ func (provider *WebProvider) getRouteHandler(response http.ResponseWriter, reque
providerID := vars["provider"]
frontendID := vars["frontend"]
routeID := vars["route"]
if provider, ok := provider.server.currentConfigurations[providerID]; ok {
currentConfigurations := provider.server.currentConfigurations.Get().(configs)
if provider, ok := currentConfigurations[providerID]; ok {
if frontend, ok := provider.Frontends[frontendID]; ok {
if route, ok := frontend.Routes[routeID]; ok {
templatesRenderer.JSON(response, http.StatusOK, route)