traefik/pkg/server/server_configuration.go

283 lines
9.2 KiB
Go
Raw Normal View History

2018-06-11 09:36:03 +00:00
package server
import (
2018-11-14 09:18:03 +00:00
"context"
2018-06-11 09:36:03 +00:00
"encoding/json"
"net/http"
"reflect"
"time"
2018-11-14 09:18:03 +00:00
"github.com/containous/alice"
2018-06-11 09:36:03 +00:00
"github.com/containous/mux"
"github.com/containous/traefik/pkg/config/dynamic"
2019-03-15 08:42:03 +00:00
"github.com/containous/traefik/pkg/log"
"github.com/containous/traefik/pkg/middlewares/accesslog"
"github.com/containous/traefik/pkg/middlewares/requestdecorator"
"github.com/containous/traefik/pkg/middlewares/tracing"
"github.com/containous/traefik/pkg/responsemodifiers"
"github.com/containous/traefik/pkg/server/middleware"
"github.com/containous/traefik/pkg/server/router"
routertcp "github.com/containous/traefik/pkg/server/router/tcp"
"github.com/containous/traefik/pkg/server/service"
"github.com/containous/traefik/pkg/server/service/tcp"
tcpCore "github.com/containous/traefik/pkg/tcp"
2018-06-11 09:36:03 +00:00
"github.com/eapache/channels"
2018-07-03 10:44:04 +00:00
"github.com/sirupsen/logrus"
2018-06-11 09:36:03 +00:00
)
// loadConfiguration manages dynamically routers, middlewares, servers and TLS configurations
func (s *Server) loadConfiguration(configMsg dynamic.Message) {
currentConfigurations := s.currentConfigurations.Get().(dynamic.Configurations)
2018-06-11 09:36:03 +00:00
// Copy configurations to new map so we don't change current if LoadConfig fails
newConfigurations := currentConfigurations.DeepCopy()
2018-06-11 09:36:03 +00:00
newConfigurations[configMsg.ProviderName] = configMsg.Configuration
s.metricsRegistry.ConfigReloadsCounter().Add(1)
handlersTCP := s.loadConfigurationTCP(newConfigurations)
for entryPointName, router := range handlersTCP {
s.entryPointsTCP[entryPointName].switchRouter(router)
2018-11-14 09:18:03 +00:00
}
2018-06-11 09:36:03 +00:00
s.metricsRegistry.LastConfigReloadSuccessGauge().Set(float64(time.Now().Unix()))
2018-06-11 09:36:03 +00:00
s.currentConfigurations.Set(newConfigurations)
for _, listener := range s.configurationListeners {
listener(*configMsg.Configuration)
}
s.postLoadConfiguration()
}
// loadConfigurationTCP returns a new gorilla.mux Route from the specified global configuration and the dynamic
2018-06-11 09:36:03 +00:00
// provider configurations.
func (s *Server) loadConfigurationTCP(configurations dynamic.Configurations) map[string]*tcpCore.Router {
2018-11-14 09:18:03 +00:00
ctx := context.TODO()
2018-06-11 09:36:03 +00:00
var entryPoints []string
for entryPointName := range s.entryPointsTCP {
entryPoints = append(entryPoints, entryPointName)
}
conf := mergeConfiguration(configurations)
2018-06-11 09:36:03 +00:00
s.tlsManager.UpdateConfigs(conf.TLS.Stores, conf.TLS.Options, conf.TLS.Certificates)
rtConf := dynamic.NewRuntimeConfig(conf)
handlersNonTLS, handlersTLS := s.createHTTPHandlers(ctx, rtConf, entryPoints)
routersTCP := s.createTCPRouters(ctx, rtConf, entryPoints, handlersNonTLS, handlersTLS)
rtConf.PopulateUsedBy()
return routersTCP
2018-06-11 09:36:03 +00:00
}
// the given configuration must not be nil. its fields will get mutated.
func (s *Server) createTCPRouters(ctx context.Context, configuration *dynamic.RuntimeConfiguration, entryPoints []string, handlers map[string]http.Handler, handlersTLS map[string]http.Handler) map[string]*tcpCore.Router {
if configuration == nil {
return make(map[string]*tcpCore.Router)
2018-06-11 09:36:03 +00:00
}
serviceManager := tcp.NewManager(configuration)
routerManager := routertcp.NewManager(configuration, serviceManager, handlers, handlersTLS, s.tlsManager)
return routerManager.BuildHandlers(ctx, entryPoints)
}
// createHTTPHandlers returns, for the given configuration and entryPoints, the HTTP handlers for non-TLS connections, and for the TLS ones. the given configuration must not be nil. its fields will get mutated.
func (s *Server) createHTTPHandlers(ctx context.Context, configuration *dynamic.RuntimeConfiguration, entryPoints []string) (map[string]http.Handler, map[string]http.Handler) {
2018-11-14 09:18:03 +00:00
serviceManager := service.NewManager(configuration.Services, s.defaultRoundTripper)
middlewaresBuilder := middleware.NewBuilder(configuration.Middlewares, serviceManager)
responseModifierFactory := responsemodifiers.NewBuilder(configuration.Middlewares)
routerManager := router.NewManager(configuration, serviceManager, middlewaresBuilder, responseModifierFactory)
2018-06-11 09:36:03 +00:00
handlersNonTLS := routerManager.BuildHandlers(ctx, entryPoints, false)
handlersTLS := routerManager.BuildHandlers(ctx, entryPoints, true)
2018-06-11 09:36:03 +00:00
2018-11-14 09:18:03 +00:00
routerHandlers := make(map[string]http.Handler)
for _, entryPointName := range entryPoints {
internalMuxRouter := mux.NewRouter().SkipClean(true)
2018-09-17 18:40:04 +00:00
2018-11-14 09:18:03 +00:00
ctx = log.With(ctx, log.Str(log.EntryPointName, entryPointName))
2018-06-11 09:36:03 +00:00
factory := s.entryPointsTCP[entryPointName].RouteAppenderFactory
2018-11-14 09:18:03 +00:00
if factory != nil {
// FIXME remove currentConfigurations
appender := factory.NewAppender(ctx, middlewaresBuilder, configuration)
2018-11-14 09:18:03 +00:00
appender.Append(internalMuxRouter)
}
2018-06-11 09:36:03 +00:00
if h, ok := handlersNonTLS[entryPointName]; ok {
2018-11-14 09:18:03 +00:00
internalMuxRouter.NotFoundHandler = h
2018-06-11 09:36:03 +00:00
} else {
internalMuxRouter.NotFoundHandler = buildDefaultHTTPRouter()
2018-06-11 09:36:03 +00:00
}
2018-11-14 09:18:03 +00:00
routerHandlers[entryPointName] = internalMuxRouter
2018-06-11 09:36:03 +00:00
2018-11-14 09:18:03 +00:00
chain := alice.New()
2018-06-11 09:36:03 +00:00
2018-11-14 09:18:03 +00:00
if s.accessLoggerMiddleware != nil {
chain = chain.Append(accesslog.WrapHandler(s.accessLoggerMiddleware))
2018-06-11 09:36:03 +00:00
}
2018-11-14 09:18:03 +00:00
if s.tracer != nil {
chain = chain.Append(tracing.WrapEntryPointHandler(ctx, s.tracer, entryPointName))
2018-10-29 17:42:03 +00:00
}
2018-06-11 09:36:03 +00:00
2018-11-14 09:18:03 +00:00
chain = chain.Append(requestdecorator.WrapHandler(s.requestDecorator))
2018-06-11 09:36:03 +00:00
2018-11-14 09:18:03 +00:00
handler, err := chain.Then(internalMuxRouter.NotFoundHandler)
2018-06-11 09:36:03 +00:00
if err != nil {
2018-11-14 09:18:03 +00:00
log.FromContext(ctx).Error(err)
continue
2018-06-11 09:36:03 +00:00
}
2018-11-14 09:18:03 +00:00
internalMuxRouter.NotFoundHandler = handler
handlerTLS, ok := handlersTLS[entryPointName]
if ok {
handlerTLSWithMiddlewares, err := chain.Then(handlerTLS)
if err != nil {
log.FromContext(ctx).Error(err)
continue
}
handlersTLS[entryPointName] = handlerTLSWithMiddlewares
}
}
return routerHandlers, handlersTLS
}
func isEmptyConfiguration(conf *dynamic.Configuration) bool {
if conf == nil {
return true
}
if conf.TCP == nil {
conf.TCP = &dynamic.TCPConfiguration{}
}
if conf.HTTP == nil {
conf.HTTP = &dynamic.HTTPConfiguration{}
2018-06-11 09:36:03 +00:00
}
return conf.HTTP.Routers == nil &&
conf.HTTP.Services == nil &&
conf.HTTP.Middlewares == nil &&
(conf.TLS == nil || conf.TLS.Certificates == nil && conf.TLS.Stores == nil && conf.TLS.Options == nil) &&
conf.TCP.Routers == nil &&
conf.TCP.Services == nil
2018-06-11 09:36:03 +00:00
}
func (s *Server) preLoadConfiguration(configMsg dynamic.Message) {
s.defaultConfigurationValues(configMsg.Configuration.HTTP)
currentConfigurations := s.currentConfigurations.Get().(dynamic.Configurations)
2018-06-11 09:36:03 +00:00
2018-11-14 09:18:03 +00:00
logger := log.WithoutContext().WithField(log.ProviderName, configMsg.ProviderName)
2018-07-03 10:44:04 +00:00
if log.GetLevel() == logrus.DebugLevel {
2019-07-12 23:24:03 +00:00
copyConf := configMsg.Configuration.DeepCopy()
if copyConf.TLS != nil {
copyConf.TLS.Certificates = nil
for _, v := range copyConf.TLS.Stores {
v.DefaultCertificate = nil
}
}
jsonConf, err := json.Marshal(copyConf)
if err != nil {
logger.Errorf("Could not marshal dynamic configuration: %v", err)
logger.Debugf("Configuration received from provider %s: [struct] %#v", configMsg.ProviderName, copyConf)
} else {
logger.Debugf("Configuration received from provider %s: %s", configMsg.ProviderName, string(jsonConf))
}
2018-07-03 10:44:04 +00:00
}
2018-06-11 09:36:03 +00:00
if isEmptyConfiguration(configMsg.Configuration) {
2018-11-14 09:18:03 +00:00
logger.Infof("Skipping empty Configuration for provider %s", configMsg.ProviderName)
2018-06-11 09:36:03 +00:00
return
}
if reflect.DeepEqual(currentConfigurations[configMsg.ProviderName], configMsg.Configuration) {
2018-11-14 09:18:03 +00:00
logger.Infof("Skipping same configuration for provider %s", configMsg.ProviderName)
2018-06-11 09:36:03 +00:00
return
}
providerConfigUpdateCh, ok := s.providerConfigUpdateMap[configMsg.ProviderName]
if !ok {
providerConfigUpdateCh = make(chan dynamic.Message)
2018-06-11 09:36:03 +00:00
s.providerConfigUpdateMap[configMsg.ProviderName] = providerConfigUpdateCh
s.routinesPool.Go(func(stop chan bool) {
s.throttleProviderConfigReload(s.providersThrottleDuration, s.configurationValidatedChan, providerConfigUpdateCh, stop)
2018-06-11 09:36:03 +00:00
})
}
providerConfigUpdateCh <- configMsg
}
func (s *Server) defaultConfigurationValues(configuration *dynamic.HTTPConfiguration) {
2018-11-14 09:18:03 +00:00
// FIXME create a config hook
2018-06-11 09:36:03 +00:00
}
func (s *Server) listenConfigurations(stop chan bool) {
for {
select {
case <-stop:
return
case configMsg, ok := <-s.configurationValidatedChan:
if !ok || configMsg.Configuration == nil {
return
}
s.loadConfiguration(configMsg)
}
}
}
// throttleProviderConfigReload throttles the configuration reload speed for a single provider.
// It will immediately publish a new configuration and then only publish the next configuration after the throttle duration.
// Note that in the case it receives N new configs in the timeframe of the throttle duration after publishing,
// it will publish the last of the newly received configurations.
func (s *Server) throttleProviderConfigReload(throttle time.Duration, publish chan<- dynamic.Message, in <-chan dynamic.Message, stop chan bool) {
2018-06-11 09:36:03 +00:00
ring := channels.NewRingChannel(1)
defer ring.Close()
s.routinesPool.Go(func(stop chan bool) {
for {
select {
case <-stop:
return
case nextConfig := <-ring.Out():
if config, ok := nextConfig.(dynamic.Message); ok {
2018-09-06 12:24:03 +00:00
publish <- config
time.Sleep(throttle)
}
2018-06-11 09:36:03 +00:00
}
}
})
for {
select {
case <-stop:
return
case nextConfig := <-in:
ring.In() <- nextConfig
}
}
}
func (s *Server) postLoadConfiguration() {
2018-11-14 09:18:03 +00:00
// FIXME metrics
// if s.metricsRegistry.IsEnabled() {
// activeConfig := s.currentConfigurations.Get().(config.Configurations)
// metrics.OnConfigurationUpdate(activeConfig)
// }
2018-06-11 09:36:03 +00:00
}
func buildDefaultHTTPRouter() *mux.Router {
2018-11-14 09:18:03 +00:00
rt := mux.NewRouter()
rt.NotFoundHandler = http.HandlerFunc(http.NotFound)
rt.SkipClean(true)
return rt
}