Add GoSafe goroutine launch

Signed-off-by: Emile Vauge <emile@vauge.com>
This commit is contained in:
Emile Vauge 2016-03-31 18:57:08 +02:00
parent af41c79798
commit bcc5f24c0f
No known key found for this signature in database
GPG key ID: D808B4C167352E59
10 changed files with 68 additions and 22 deletions

View file

@ -10,6 +10,7 @@ import (
"errors" "errors"
"fmt" "fmt"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/containous/traefik/safe"
"github.com/xenolf/lego/acme" "github.com/xenolf/lego/acme"
"io/ioutil" "io/ioutil"
fmtlog "log" fmtlog "log"
@ -242,7 +243,9 @@ func (a *ACME) CreateConfig(tlsConfig *tls.Config, CheckOnDemandDomain func(doma
return err return err
} }
go a.retrieveCertificates(client, account) safe.Go(func() {
a.retrieveCertificates(client, account)
})
tlsConfig.GetCertificate = func(clientHello *tls.ClientHelloInfo) (*tls.Certificate, error) { tlsConfig.GetCertificate = func(clientHello *tls.ClientHelloInfo) (*tls.Certificate, error) {
if challengeCert, ok := wrapperChallengeProvider.getCertificate(clientHello.ServerName); ok { if challengeCert, ok := wrapperChallengeProvider.getCertificate(clientHello.ServerName); ok {
@ -261,7 +264,7 @@ func (a *ACME) CreateConfig(tlsConfig *tls.Config, CheckOnDemandDomain func(doma
} }
ticker := time.NewTicker(24 * time.Hour) ticker := time.NewTicker(24 * time.Hour)
go func() { safe.Go(func() {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
@ -272,7 +275,7 @@ func (a *ACME) CreateConfig(tlsConfig *tls.Config, CheckOnDemandDomain func(doma
} }
} }
}() })
return nil return nil
} }

View file

@ -8,6 +8,7 @@ import (
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/cenkalti/backoff" "github.com/cenkalti/backoff"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types" "github.com/containous/traefik/types"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
) )
@ -35,7 +36,7 @@ func (provider *ConsulCatalog) watchServices(stopCh <-chan struct{}) <-chan map[
catalog := provider.client.Catalog() catalog := provider.client.Catalog()
go func() { safe.Go(func() {
defer close(watchCh) defer close(watchCh)
opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
@ -64,7 +65,7 @@ func (provider *ConsulCatalog) watchServices(stopCh <-chan struct{}) <-chan map[
watchCh <- data watchCh <- data
} }
} }
}() })
return watchCh return watchCh
} }
@ -182,7 +183,7 @@ func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMess
} }
provider.client = client provider.client = client
go func() { safe.Go(func() {
notify := func(err error, time time.Duration) { notify := func(err error, time time.Duration) {
log.Errorf("Consul connection error %+v, retrying in %s", err, time) log.Errorf("Consul connection error %+v, retrying in %s", err, time)
} }
@ -193,7 +194,7 @@ func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMess
if err != nil { if err != nil {
log.Fatalf("Cannot connect to consul server %+v", err) log.Fatalf("Cannot connect to consul server %+v", err)
} }
}() })
return err return err
} }

View file

@ -10,6 +10,7 @@ import (
"github.com/BurntSushi/ty/fun" "github.com/BurntSushi/ty/fun"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/cenkalti/backoff" "github.com/cenkalti/backoff"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types" "github.com/containous/traefik/types"
"github.com/fsouza/go-dockerclient" "github.com/fsouza/go-dockerclient"
) )
@ -33,7 +34,7 @@ type DockerTLS struct {
// Provide allows the provider to provide configurations to traefik // Provide allows the provider to provide configurations to traefik
// using the given configuration channel. // using the given configuration channel.
func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage) error { func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage) error {
go func() { safe.Go(func() {
operation := func() error { operation := func() error {
var dockerClient *docker.Client var dockerClient *docker.Client
var err error var err error
@ -93,7 +94,7 @@ func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage) er
if err != nil { if err != nil {
log.Fatalf("Cannot connect to docker server %+v", err) log.Fatalf("Cannot connect to docker server %+v", err)
} }
}() })
return nil return nil
} }

View file

@ -7,6 +7,7 @@ import (
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types" "github.com/containous/traefik/types"
"gopkg.in/fsnotify.v1" "gopkg.in/fsnotify.v1"
) )
@ -34,7 +35,7 @@ func (provider *File) Provide(configurationChan chan<- types.ConfigMessage) erro
if provider.Watch { if provider.Watch {
// Process events // Process events
go func() { safe.Go(func() {
defer watcher.Close() defer watcher.Close()
for { for {
select { select {
@ -53,7 +54,7 @@ func (provider *File) Provide(configurationChan chan<- types.ConfigMessage) erro
log.Error("Watcher event error", error) log.Error("Watcher event error", error)
} }
} }
}() })
err = watcher.Add(filepath.Dir(file.Name())) err = watcher.Add(filepath.Dir(file.Name()))
if err != nil { if err != nil {
log.Error("Error adding file watcher", err) log.Error("Error adding file watcher", err)

View file

@ -12,6 +12,7 @@ import (
"github.com/BurntSushi/ty/fun" "github.com/BurntSushi/ty/fun"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types" "github.com/containous/traefik/types"
"github.com/docker/libkv" "github.com/docker/libkv"
"github.com/docker/libkv/store" "github.com/docker/libkv/store"
@ -101,7 +102,9 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage) error
} }
provider.kvclient = kv provider.kvclient = kv
if provider.Watch { if provider.Watch {
go provider.watchKv(configurationChan, provider.Prefix) safe.Go(func() {
provider.watchKv(configurationChan, provider.Prefix)
})
} }
configuration := provider.loadConfig() configuration := provider.loadConfig()
configurationChan <- types.ConfigMessage{ configurationChan <- types.ConfigMessage{

View file

@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/containous/traefik/safe"
"github.com/docker/libkv/store" "github.com/docker/libkv/store"
"reflect" "reflect"
"sort" "sort"
@ -256,7 +257,9 @@ func TestKvWatchTree(t *testing.T) {
} }
configChan := make(chan types.ConfigMessage) configChan := make(chan types.ConfigMessage)
go provider.watchKv(configChan, "prefix") safe.Go(func() {
provider.watchKv(configChan, "prefix")
})
select { select {
case c1 := <-returnedChans: case c1 := <-returnedChans:

View file

@ -10,6 +10,7 @@ import (
"crypto/tls" "crypto/tls"
"github.com/BurntSushi/ty/fun" "github.com/BurntSushi/ty/fun"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types" "github.com/containous/traefik/types"
"github.com/gambol99/go-marathon" "github.com/gambol99/go-marathon"
"net/http" "net/http"
@ -63,7 +64,7 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage)
if err := client.AddEventsListener(update, marathon.EVENTS_APPLICATIONS); err != nil { if err := client.AddEventsListener(update, marathon.EVENTS_APPLICATIONS); err != nil {
log.Errorf("Failed to register for events, %s", err) log.Errorf("Failed to register for events, %s", err)
} else { } else {
go func() { safe.Go(func() {
for { for {
event := <-update event := <-update
log.Debug("Marathon event receveived", event) log.Debug("Marathon event receveived", event)
@ -75,7 +76,7 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage)
} }
} }
} }
}() })
} }
} }

28
safe/safe.go Normal file
View file

@ -0,0 +1,28 @@
package safe
import (
"log"
"runtime/debug"
)
// Go starts a recoverable goroutine
func Go(goroutine func()) {
GoWithRecover(goroutine, defaultRecoverGoroutine)
}
// GoWithRecover starts a recoverable goroutine using given customRecover() function
func GoWithRecover(goroutine func(), customRecover func(err interface{})) {
go func() {
defer func() {
if err := recover(); err != nil {
customRecover(err)
}
}()
goroutine()
}()
}
func defaultRecoverGoroutine(err interface{}) {
log.Println(err)
debug.PrintStack()
}

View file

@ -32,7 +32,7 @@ docker login -e $DOCKER_EMAIL -u $DOCKER_USER -p $DOCKER_PASS
docker tag containous/traefik emilevauge/traefik:latest docker tag containous/traefik emilevauge/traefik:latest
docker push emilevauge/traefik:latest docker push emilevauge/traefik:latest
docker tag emilevauge/traefik:latest emilevauge/traefik:${VERSION} docker tag emilevauge/traefik:latest emilevauge/traefik:${VERSION}
docker push -q emilevauge/traefik:${VERSION} docker push emilevauge/traefik:${VERSION}
cd .. cd ..
rm -Rf traefik-library-image/ rm -Rf traefik-library-image/

View file

@ -27,6 +27,7 @@ import (
"github.com/containous/oxy/stream" "github.com/containous/oxy/stream"
"github.com/containous/traefik/middlewares" "github.com/containous/traefik/middlewares"
"github.com/containous/traefik/provider" "github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types" "github.com/containous/traefik/types"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/mailgun/manners" "github.com/mailgun/manners"
@ -81,8 +82,12 @@ func NewServer(globalConfiguration GlobalConfiguration) *Server {
// Start starts the server and blocks until server is shutted down. // Start starts the server and blocks until server is shutted down.
func (server *Server) Start() { func (server *Server) Start() {
server.startHTTPServers() server.startHTTPServers()
go server.listenProviders() safe.Go(func() {
go server.listenConfigurations() server.listenProviders()
})
safe.Go(func() {
server.listenConfigurations()
})
server.configureProviders() server.configureProviders()
server.startProviders() server.startProviders()
go server.listenSignals() go server.listenSignals()
@ -133,13 +138,13 @@ func (server *Server) listenProviders() {
server.configurationValidatedChan <- configMsg server.configurationValidatedChan <- configMsg
} else { } else {
log.Debugf("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration) log.Debugf("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration)
go func() { safe.Go(func() {
<-time.After(server.globalConfiguration.ProvidersThrottleDuration) <-time.After(server.globalConfiguration.ProvidersThrottleDuration)
if time.Now().After(lastReceivedConfiguration.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) { if time.Now().After(lastReceivedConfiguration.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
log.Debugf("Waited for %s config, OK", configMsg.ProviderName) log.Debugf("Waited for %s config, OK", configMsg.ProviderName)
server.configurationValidatedChan <- *lastConfigs[configMsg.ProviderName] server.configurationValidatedChan <- *lastConfigs[configMsg.ProviderName]
} }
}() })
} }
lastReceivedConfiguration = time.Now() lastReceivedConfiguration = time.Now()
} }
@ -214,12 +219,12 @@ func (server *Server) startProviders() {
jsonConf, _ := json.Marshal(provider) jsonConf, _ := json.Marshal(provider)
log.Infof("Starting provider %v %s", reflect.TypeOf(provider), jsonConf) log.Infof("Starting provider %v %s", reflect.TypeOf(provider), jsonConf)
currentProvider := provider currentProvider := provider
go func() { safe.Go(func() {
err := currentProvider.Provide(server.configurationChan) err := currentProvider.Provide(server.configurationChan)
if err != nil { if err != nil {
log.Errorf("Error starting provider %s", err) log.Errorf("Error starting provider %s", err)
} }
}() })
} }
} }