Extract providers configuration from server.go
This commit is contained in:
parent
ef4aa202d0
commit
395b1702de
5 changed files with 121 additions and 89 deletions
|
@ -153,7 +153,7 @@ func run(globalConfiguration *configuration.GlobalConfiguration, configFile stri
|
||||||
stats(globalConfiguration)
|
stats(globalConfiguration)
|
||||||
|
|
||||||
log.Debugf("Global configuration loaded %s", string(jsonConf))
|
log.Debugf("Global configuration loaded %s", string(jsonConf))
|
||||||
svr := server.NewServer(*globalConfiguration)
|
svr := server.NewServer(*globalConfiguration, configuration.NewProviderAggregator(globalConfiguration))
|
||||||
svr.Start()
|
svr.Start()
|
||||||
defer svr.Close()
|
defer svr.Close()
|
||||||
|
|
||||||
|
|
91
configuration/provider_aggregator.go
Normal file
91
configuration/provider_aggregator.go
Normal file
|
@ -0,0 +1,91 @@
|
||||||
|
package configuration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/containous/traefik/log"
|
||||||
|
"github.com/containous/traefik/provider"
|
||||||
|
"github.com/containous/traefik/safe"
|
||||||
|
"github.com/containous/traefik/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
type providerAggregator struct {
|
||||||
|
providers []provider.Provider
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewProviderAggregator return an aggregate of all the providers configured in GlobalConfiguration
|
||||||
|
func NewProviderAggregator(gc *GlobalConfiguration) provider.Provider {
|
||||||
|
provider := providerAggregator{}
|
||||||
|
if gc.Docker != nil {
|
||||||
|
provider.providers = append(provider.providers, gc.Docker)
|
||||||
|
}
|
||||||
|
if gc.Marathon != nil {
|
||||||
|
provider.providers = append(provider.providers, gc.Marathon)
|
||||||
|
}
|
||||||
|
if gc.File != nil {
|
||||||
|
provider.providers = append(provider.providers, gc.File)
|
||||||
|
}
|
||||||
|
if gc.Rest != nil {
|
||||||
|
provider.providers = append(provider.providers, gc.Rest)
|
||||||
|
}
|
||||||
|
if gc.Consul != nil {
|
||||||
|
provider.providers = append(provider.providers, gc.Consul)
|
||||||
|
}
|
||||||
|
if gc.ConsulCatalog != nil {
|
||||||
|
provider.providers = append(provider.providers, gc.ConsulCatalog)
|
||||||
|
}
|
||||||
|
if gc.Etcd != nil {
|
||||||
|
provider.providers = append(provider.providers, gc.Etcd)
|
||||||
|
}
|
||||||
|
if gc.Zookeeper != nil {
|
||||||
|
provider.providers = append(provider.providers, gc.Zookeeper)
|
||||||
|
}
|
||||||
|
if gc.Boltdb != nil {
|
||||||
|
provider.providers = append(provider.providers, gc.Boltdb)
|
||||||
|
}
|
||||||
|
if gc.Kubernetes != nil {
|
||||||
|
provider.providers = append(provider.providers, gc.Kubernetes)
|
||||||
|
}
|
||||||
|
if gc.Mesos != nil {
|
||||||
|
provider.providers = append(provider.providers, gc.Mesos)
|
||||||
|
}
|
||||||
|
if gc.Eureka != nil {
|
||||||
|
provider.providers = append(provider.providers, gc.Eureka)
|
||||||
|
}
|
||||||
|
if gc.ECS != nil {
|
||||||
|
provider.providers = append(provider.providers, gc.ECS)
|
||||||
|
}
|
||||||
|
if gc.Rancher != nil {
|
||||||
|
provider.providers = append(provider.providers, gc.Rancher)
|
||||||
|
}
|
||||||
|
if gc.DynamoDB != nil {
|
||||||
|
provider.providers = append(provider.providers, gc.DynamoDB)
|
||||||
|
}
|
||||||
|
if gc.ServiceFabric != nil {
|
||||||
|
provider.providers = append(provider.providers, gc.ServiceFabric)
|
||||||
|
}
|
||||||
|
if len(provider.providers) == 1 {
|
||||||
|
return provider.providers[0]
|
||||||
|
}
|
||||||
|
return provider
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p providerAggregator) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
|
||||||
|
for _, p := range p.providers {
|
||||||
|
providerType := reflect.TypeOf(p)
|
||||||
|
jsonConf, err := json.Marshal(p)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf("Unable to marshal provider conf %v with error: %v", providerType, err)
|
||||||
|
}
|
||||||
|
log.Infof("Starting provider %v %s", providerType, jsonConf)
|
||||||
|
currentProvider := p
|
||||||
|
safe.Go(func() {
|
||||||
|
err := currentProvider.Provide(configurationChan, pool, constraints)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Error starting provider %v: %s", providerType, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -17,7 +17,6 @@ import (
|
||||||
type Provider struct {
|
type Provider struct {
|
||||||
configurationChan chan<- types.ConfigMessage
|
configurationChan chan<- types.ConfigMessage
|
||||||
EntryPoint string `description:"EntryPoint" export:"true"`
|
EntryPoint string `description:"EntryPoint" export:"true"`
|
||||||
CurrentConfigurations *safe.Safe
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var templatesRenderer = render.New(render.Options{Directory: "nowhere"})
|
var templatesRenderer = render.New(render.Options{Directory: "nowhere"})
|
||||||
|
@ -45,7 +44,10 @@ func (p *Provider) AddRoutes(systemRouter *mux.Router) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// TODO: Deprecated configuration - Change to `rest` in the future
|
// TODO: Deprecated configuration - Change to `rest` in the future
|
||||||
p.configurationChan <- types.ConfigMessage{ProviderName: "web", Configuration: configuration}
|
p.configurationChan <- types.ConfigMessage{ProviderName: "web", Configuration: configuration}
|
||||||
p.getConfigHandler(response, request)
|
err := templatesRenderer.JSON(response, http.StatusOK, configuration)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Errorf("Error parsing configuration %+v", err)
|
log.Errorf("Error parsing configuration %+v", err)
|
||||||
http.Error(response, fmt.Sprintf("%+v", err), http.StatusBadRequest)
|
http.Error(response, fmt.Sprintf("%+v", err), http.StatusBadRequest)
|
||||||
|
@ -59,11 +61,3 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
|
||||||
p.configurationChan = configurationChan
|
p.configurationChan = configurationChan
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Provider) getConfigHandler(response http.ResponseWriter, request *http.Request) {
|
|
||||||
currentConfigurations := p.CurrentConfigurations.Get().(types.Configurations)
|
|
||||||
err := templatesRenderer.JSON(response, http.StatusOK, currentConfigurations)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -65,7 +65,6 @@ type Server struct {
|
||||||
configurationValidatedChan chan types.ConfigMessage
|
configurationValidatedChan chan types.ConfigMessage
|
||||||
signals chan os.Signal
|
signals chan os.Signal
|
||||||
stopChan chan bool
|
stopChan chan bool
|
||||||
providers []provider.Provider
|
|
||||||
currentConfigurations safe.Safe
|
currentConfigurations safe.Safe
|
||||||
providerConfigUpdateMap map[string]chan types.ConfigMessage
|
providerConfigUpdateMap map[string]chan types.ConfigMessage
|
||||||
globalConfiguration configuration.GlobalConfiguration
|
globalConfiguration configuration.GlobalConfiguration
|
||||||
|
@ -75,6 +74,7 @@ type Server struct {
|
||||||
leadership *cluster.Leadership
|
leadership *cluster.Leadership
|
||||||
defaultForwardingRoundTripper http.RoundTripper
|
defaultForwardingRoundTripper http.RoundTripper
|
||||||
metricsRegistry metrics.Registry
|
metricsRegistry metrics.Registry
|
||||||
|
provider provider.Provider
|
||||||
}
|
}
|
||||||
|
|
||||||
type serverEntryPoints map[string]*serverEntryPoint
|
type serverEntryPoints map[string]*serverEntryPoint
|
||||||
|
@ -96,15 +96,15 @@ type serverRoute struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer returns an initialized Server.
|
// NewServer returns an initialized Server.
|
||||||
func NewServer(globalConfiguration configuration.GlobalConfiguration) *Server {
|
func NewServer(globalConfiguration configuration.GlobalConfiguration, provider provider.Provider) *Server {
|
||||||
server := new(Server)
|
server := new(Server)
|
||||||
|
|
||||||
|
server.provider = provider
|
||||||
server.serverEntryPoints = make(map[string]*serverEntryPoint)
|
server.serverEntryPoints = make(map[string]*serverEntryPoint)
|
||||||
server.configurationChan = make(chan types.ConfigMessage, 100)
|
server.configurationChan = make(chan types.ConfigMessage, 100)
|
||||||
server.configurationValidatedChan = make(chan types.ConfigMessage, 100)
|
server.configurationValidatedChan = make(chan types.ConfigMessage, 100)
|
||||||
server.signals = make(chan os.Signal, 1)
|
server.signals = make(chan os.Signal, 1)
|
||||||
server.stopChan = make(chan bool, 1)
|
server.stopChan = make(chan bool, 1)
|
||||||
server.providers = []provider.Provider{}
|
|
||||||
server.configureSignals()
|
server.configureSignals()
|
||||||
currentConfigurations := make(types.Configurations)
|
currentConfigurations := make(types.Configurations)
|
||||||
server.currentConfigurations.Set(currentConfigurations)
|
server.currentConfigurations.Set(currentConfigurations)
|
||||||
|
@ -207,8 +207,7 @@ func (s *Server) Start() {
|
||||||
s.routinesPool.Go(func(stop chan bool) {
|
s.routinesPool.Go(func(stop chan bool) {
|
||||||
s.listenConfigurations(stop)
|
s.listenConfigurations(stop)
|
||||||
})
|
})
|
||||||
s.configureProviders()
|
s.startProvider()
|
||||||
s.startProviders()
|
|
||||||
go s.listenSignals()
|
go s.listenSignals()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -541,73 +540,21 @@ func (s *Server) postLoadConfiguration() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) configureProviders() {
|
func (s *Server) startProvider() {
|
||||||
// configure providers
|
|
||||||
if s.globalConfiguration.Docker != nil {
|
|
||||||
s.providers = append(s.providers, s.globalConfiguration.Docker)
|
|
||||||
}
|
|
||||||
if s.globalConfiguration.Marathon != nil {
|
|
||||||
s.providers = append(s.providers, s.globalConfiguration.Marathon)
|
|
||||||
}
|
|
||||||
if s.globalConfiguration.File != nil {
|
|
||||||
s.providers = append(s.providers, s.globalConfiguration.File)
|
|
||||||
}
|
|
||||||
if s.globalConfiguration.Rest != nil {
|
|
||||||
s.providers = append(s.providers, s.globalConfiguration.Rest)
|
|
||||||
s.globalConfiguration.Rest.CurrentConfigurations = &s.currentConfigurations
|
|
||||||
}
|
|
||||||
if s.globalConfiguration.Consul != nil {
|
|
||||||
s.providers = append(s.providers, s.globalConfiguration.Consul)
|
|
||||||
}
|
|
||||||
if s.globalConfiguration.ConsulCatalog != nil {
|
|
||||||
s.providers = append(s.providers, s.globalConfiguration.ConsulCatalog)
|
|
||||||
}
|
|
||||||
if s.globalConfiguration.Etcd != nil {
|
|
||||||
s.providers = append(s.providers, s.globalConfiguration.Etcd)
|
|
||||||
}
|
|
||||||
if s.globalConfiguration.Zookeeper != nil {
|
|
||||||
s.providers = append(s.providers, s.globalConfiguration.Zookeeper)
|
|
||||||
}
|
|
||||||
if s.globalConfiguration.Boltdb != nil {
|
|
||||||
s.providers = append(s.providers, s.globalConfiguration.Boltdb)
|
|
||||||
}
|
|
||||||
if s.globalConfiguration.Kubernetes != nil {
|
|
||||||
s.providers = append(s.providers, s.globalConfiguration.Kubernetes)
|
|
||||||
}
|
|
||||||
if s.globalConfiguration.Mesos != nil {
|
|
||||||
s.providers = append(s.providers, s.globalConfiguration.Mesos)
|
|
||||||
}
|
|
||||||
if s.globalConfiguration.Eureka != nil {
|
|
||||||
s.providers = append(s.providers, s.globalConfiguration.Eureka)
|
|
||||||
}
|
|
||||||
if s.globalConfiguration.ECS != nil {
|
|
||||||
s.providers = append(s.providers, s.globalConfiguration.ECS)
|
|
||||||
}
|
|
||||||
if s.globalConfiguration.Rancher != nil {
|
|
||||||
s.providers = append(s.providers, s.globalConfiguration.Rancher)
|
|
||||||
}
|
|
||||||
if s.globalConfiguration.DynamoDB != nil {
|
|
||||||
s.providers = append(s.providers, s.globalConfiguration.DynamoDB)
|
|
||||||
}
|
|
||||||
if s.globalConfiguration.ServiceFabric != nil {
|
|
||||||
s.providers = append(s.providers, s.globalConfiguration.ServiceFabric)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) startProviders() {
|
|
||||||
// start providers
|
// start providers
|
||||||
for _, p := range s.providers {
|
providerType := reflect.TypeOf(s.provider)
|
||||||
providerType := reflect.TypeOf(p)
|
jsonConf, err := json.Marshal(s.provider)
|
||||||
jsonConf, _ := json.Marshal(p)
|
if err != nil {
|
||||||
|
log.Debugf("Unable to marshal provider conf %v with error: %v", providerType, err)
|
||||||
|
}
|
||||||
log.Infof("Starting provider %v %s", providerType, jsonConf)
|
log.Infof("Starting provider %v %s", providerType, jsonConf)
|
||||||
currentProvider := p
|
currentProvider := s.provider
|
||||||
safe.Go(func() {
|
safe.Go(func() {
|
||||||
err := currentProvider.Provide(s.configurationChan, s.routinesPool, s.globalConfiguration.Constraints)
|
err := currentProvider.Provide(s.configurationChan, s.routinesPool, s.globalConfiguration.Constraints)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error starting provider %v: %s", providerType, err)
|
log.Errorf("Error starting provider %v: %s", providerType, err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func createClientTLSConfig(entryPointName string, tlsOption *traefikTls.TLS) (*tls.Config, error) {
|
func createClientTLSConfig(entryPointName string, tlsOption *traefikTls.TLS) (*tls.Config, error) {
|
||||||
|
|
|
@ -141,7 +141,7 @@ func TestPrepareServerTimeouts(t *testing.T) {
|
||||||
}
|
}
|
||||||
router := middlewares.NewHandlerSwitcher(mux.NewRouter())
|
router := middlewares.NewHandlerSwitcher(mux.NewRouter())
|
||||||
|
|
||||||
srv := NewServer(test.globalConfig)
|
srv := NewServer(test.globalConfig, nil)
|
||||||
httpServer, _, err := srv.prepareServer(entryPointName, entryPoint, router, nil, nil)
|
httpServer, _, err := srv.prepareServer(entryPointName, entryPoint, router, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error when preparing srv: %s", err)
|
t.Fatalf("Unexpected error when preparing srv: %s", err)
|
||||||
|
@ -282,7 +282,7 @@ func setupListenProvider(throttleDuration time.Duration) (server *Server, stop c
|
||||||
ProvidersThrottleDuration: flaeg.Duration(throttleDuration),
|
ProvidersThrottleDuration: flaeg.Duration(throttleDuration),
|
||||||
}
|
}
|
||||||
|
|
||||||
server = NewServer(globalConfig)
|
server = NewServer(globalConfig, nil)
|
||||||
go server.listenProviders(stop)
|
go server.listenProviders(stop)
|
||||||
|
|
||||||
return server, stop, invokeStopChan
|
return server, stop, invokeStopChan
|
||||||
|
@ -475,7 +475,7 @@ func TestServerLoadConfigHealthCheckOptions(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
srv := NewServer(globalConfig)
|
srv := NewServer(globalConfig, nil)
|
||||||
if _, err := srv.loadConfig(dynamicConfigs, globalConfig); err != nil {
|
if _, err := srv.loadConfig(dynamicConfigs, globalConfig); err != nil {
|
||||||
t.Fatalf("got error: %s", err)
|
t.Fatalf("got error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -647,7 +647,7 @@ func TestServerLoadConfigEmptyBasicAuth(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
srv := NewServer(globalConfig)
|
srv := NewServer(globalConfig, nil)
|
||||||
if _, err := srv.loadConfig(dynamicConfigs, globalConfig); err != nil {
|
if _, err := srv.loadConfig(dynamicConfigs, globalConfig); err != nil {
|
||||||
t.Fatalf("got error: %s", err)
|
t.Fatalf("got error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -675,7 +675,7 @@ func TestServerLoadCertificateWithDefaultEntryPoint(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
srv := NewServer(globalConfig)
|
srv := NewServer(globalConfig, nil)
|
||||||
if mapEntryPoints, err := srv.loadConfig(dynamicConfigs, globalConfig); err != nil {
|
if mapEntryPoints, err := srv.loadConfig(dynamicConfigs, globalConfig); err != nil {
|
||||||
t.Fatalf("got error: %s", err)
|
t.Fatalf("got error: %s", err)
|
||||||
} else if mapEntryPoints["https"].certs.Get() == nil {
|
} else if mapEntryPoints["https"].certs.Get() == nil {
|
||||||
|
@ -905,7 +905,7 @@ func TestServerResponseEmptyBackend(t *testing.T) {
|
||||||
}
|
}
|
||||||
dynamicConfigs := types.Configurations{"config": test.dynamicConfig(testServer.URL)}
|
dynamicConfigs := types.Configurations{"config": test.dynamicConfig(testServer.URL)}
|
||||||
|
|
||||||
srv := NewServer(globalConfig)
|
srv := NewServer(globalConfig, nil)
|
||||||
entryPoints, err := srv.loadConfig(dynamicConfigs, globalConfig)
|
entryPoints, err := srv.loadConfig(dynamicConfigs, globalConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error loading config: %s", err)
|
t.Fatalf("error loading config: %s", err)
|
||||||
|
|
Loading…
Reference in a new issue