Improve setup readability.

Co-authored-by: Ludovic Fernandez <ldez@users.noreply.github.com>
This commit is contained in:
Julien Salleyron 2020-12-01 10:04:04 +01:00 committed by GitHub
parent b0aa27db31
commit b5db753e11
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 113 additions and 82 deletions

View file

@ -7,6 +7,15 @@ import (
const outputDir = "./plugins-storage/" const outputDir = "./plugins-storage/"
func createPluginBuilder(staticConfiguration *static.Configuration) (*plugins.Builder, error) {
client, plgs, devPlugin, err := initPlugins(staticConfiguration)
if err != nil {
return nil, err
}
return plugins.NewBuilder(client, plgs, devPlugin)
}
func initPlugins(staticCfg *static.Configuration) (*plugins.Client, map[string]plugins.Descriptor, *plugins.DevPlugin, error) { func initPlugins(staticCfg *static.Configuration) (*plugins.Client, map[string]plugins.Descriptor, *plugins.DevPlugin, error) {
if !isPilotEnabled(staticCfg) || !hasPlugins(staticCfg) { if !isPilotEnabled(staticCfg) || !hasPlugins(staticCfg) {
return nil, map[string]plugins.Descriptor{}, nil, nil return nil, map[string]plugins.Descriptor{}, nil, nil

View file

@ -29,7 +29,6 @@ import (
"github.com/traefik/traefik/v2/pkg/metrics" "github.com/traefik/traefik/v2/pkg/metrics"
"github.com/traefik/traefik/v2/pkg/middlewares/accesslog" "github.com/traefik/traefik/v2/pkg/middlewares/accesslog"
"github.com/traefik/traefik/v2/pkg/pilot" "github.com/traefik/traefik/v2/pkg/pilot"
"github.com/traefik/traefik/v2/pkg/plugins"
"github.com/traefik/traefik/v2/pkg/provider/acme" "github.com/traefik/traefik/v2/pkg/provider/acme"
"github.com/traefik/traefik/v2/pkg/provider/aggregator" "github.com/traefik/traefik/v2/pkg/provider/aggregator"
"github.com/traefik/traefik/v2/pkg/provider/traefik" "github.com/traefik/traefik/v2/pkg/provider/traefik"
@ -174,18 +173,20 @@ func runCmd(staticConfiguration *static.Configuration) error {
func setupServer(staticConfiguration *static.Configuration) (*server.Server, error) { func setupServer(staticConfiguration *static.Configuration) (*server.Server, error) {
providerAggregator := aggregator.NewProviderAggregator(*staticConfiguration.Providers) providerAggregator := aggregator.NewProviderAggregator(*staticConfiguration.Providers)
ctx := context.Background()
routinesPool := safe.NewPool(ctx)
// adds internal provider // adds internal provider
err := providerAggregator.AddProvider(traefik.New(*staticConfiguration)) err := providerAggregator.AddProvider(traefik.New(*staticConfiguration))
if err != nil { if err != nil {
return nil, err return nil, err
} }
// ACME
tlsManager := traefiktls.NewManager() tlsManager := traefiktls.NewManager()
httpChallengeProvider := acme.NewChallengeHTTP() httpChallengeProvider := acme.NewChallengeHTTP()
tlsChallengeProvider := acme.NewChallengeTLSALPN(time.Duration(staticConfiguration.Providers.ProvidersThrottleDuration)) tlsChallengeProvider := acme.NewChallengeTLSALPN(time.Duration(staticConfiguration.Providers.ProvidersThrottleDuration))
err = providerAggregator.AddProvider(tlsChallengeProvider) err = providerAggregator.AddProvider(tlsChallengeProvider)
if err != nil { if err != nil {
return nil, err return nil, err
@ -193,6 +194,8 @@ func setupServer(staticConfiguration *static.Configuration) (*server.Server, err
acmeProviders := initACMEProvider(staticConfiguration, &providerAggregator, tlsManager, httpChallengeProvider, tlsChallengeProvider) acmeProviders := initACMEProvider(staticConfiguration, &providerAggregator, tlsManager, httpChallengeProvider, tlsChallengeProvider)
// Entrypoints
serverEntryPointsTCP, err := server.NewTCPEntryPoints(staticConfiguration.EntryPoints) serverEntryPointsTCP, err := server.NewTCPEntryPoints(staticConfiguration.EntryPoints)
if err != nil { if err != nil {
return nil, err return nil, err
@ -203,28 +206,113 @@ func setupServer(staticConfiguration *static.Configuration) (*server.Server, err
return nil, err return nil, err
} }
ctx := context.Background() // Pilot
routinesPool := safe.NewPool(ctx)
metricRegistries := registerMetricClients(staticConfiguration.Metrics)
var aviator *pilot.Pilot var aviator *pilot.Pilot
var pilotRegistry *metrics.PilotRegistry
if isPilotEnabled(staticConfiguration) { if isPilotEnabled(staticConfiguration) {
pilotRegistry := metrics.RegisterPilot() pilotRegistry = metrics.RegisterPilot()
aviator = pilot.New(staticConfiguration.Pilot.Token, pilotRegistry, routinesPool) aviator = pilot.New(staticConfiguration.Pilot.Token, pilotRegistry, routinesPool)
routinesPool.GoCtx(func(ctx context.Context) { routinesPool.GoCtx(func(ctx context.Context) {
aviator.Tick(ctx) aviator.Tick(ctx)
}) })
metricRegistries = append(metricRegistries, pilotRegistry)
} }
// Plugins
pluginBuilder, err := createPluginBuilder(staticConfiguration)
if err != nil {
return nil, err
}
// Metrics
metricRegistries := registerMetricClients(staticConfiguration.Metrics)
if pilotRegistry != nil {
metricRegistries = append(metricRegistries, pilotRegistry)
}
metricsRegistry := metrics.NewMultiRegistry(metricRegistries) metricsRegistry := metrics.NewMultiRegistry(metricRegistries)
// Service manager factory
roundTripperManager := service.NewRoundTripperManager()
acmeHTTPHandler := getHTTPChallengeHandler(acmeProviders, httpChallengeProvider)
managerFactory := service.NewManagerFactory(*staticConfiguration, routinesPool, metricsRegistry, roundTripperManager, acmeHTTPHandler)
// Router factory
accessLog := setupAccessLog(staticConfiguration.AccessLog) accessLog := setupAccessLog(staticConfiguration.AccessLog)
chainBuilder := middleware.NewChainBuilder(*staticConfiguration, metricsRegistry, accessLog) chainBuilder := middleware.NewChainBuilder(*staticConfiguration, metricsRegistry, accessLog)
roundTripperManager := service.NewRoundTripperManager() routerFactory := server.NewRouterFactory(*staticConfiguration, managerFactory, tlsManager, chainBuilder, pluginBuilder)
// Watcher
watcher := server.NewConfigurationWatcher(
routinesPool,
providerAggregator,
time.Duration(staticConfiguration.Providers.ProvidersThrottleDuration),
getDefaultsEntrypoints(staticConfiguration),
)
// TLS
watcher.AddListener(func(conf dynamic.Configuration) {
ctx := context.Background()
tlsManager.UpdateConfigs(ctx, conf.TLS.Stores, conf.TLS.Options, conf.TLS.Certificates)
})
// Metrics
watcher.AddListener(func(_ dynamic.Configuration) {
metricsRegistry.ConfigReloadsCounter().Add(1)
metricsRegistry.LastConfigReloadSuccessGauge().Set(float64(time.Now().Unix()))
})
// Server Transports
watcher.AddListener(func(conf dynamic.Configuration) {
roundTripperManager.Update(conf.HTTP.ServersTransports)
})
// Switch router
watcher.AddListener(switchRouter(routerFactory, serverEntryPointsTCP, serverEntryPointsUDP, aviator))
// Metrics
if metricsRegistry.IsEpEnabled() || metricsRegistry.IsSvcEnabled() {
var eps []string
for key := range serverEntryPointsTCP {
eps = append(eps, key)
}
watcher.AddListener(func(conf dynamic.Configuration) {
metrics.OnConfigurationUpdate(conf, eps)
})
}
// TLS challenge
watcher.AddListener(tlsChallengeProvider.ListenConfiguration)
// ACME
resolverNames := map[string]struct{}{}
for _, p := range acmeProviders {
resolverNames[p.ResolverName] = struct{}{}
watcher.AddListener(p.ListenConfiguration)
}
// Certificate resolver logs
watcher.AddListener(func(config dynamic.Configuration) {
for rtName, rt := range config.HTTP.Routers {
if rt.TLS == nil || rt.TLS.CertResolver == "" {
continue
}
if _, ok := resolverNames[rt.TLS.CertResolver]; !ok {
log.WithoutContext().Errorf("the router %s uses a non-existent resolver: %s", rtName, rt.TLS.CertResolver)
}
}
})
return server.NewServer(routinesPool, serverEntryPointsTCP, serverEntryPointsUDP, watcher, chainBuilder, accessLog), nil
}
func getHTTPChallengeHandler(acmeProviders []*acme.Provider, httpChallengeProvider http.Handler) http.Handler {
var acmeHTTPHandler http.Handler var acmeHTTPHandler http.Handler
for _, p := range acmeProviders { for _, p := range acmeProviders {
if p != nil && p.HTTPChallenge != nil { if p != nil && p.HTTPChallenge != nil {
@ -232,21 +320,10 @@ func setupServer(staticConfiguration *static.Configuration) (*server.Server, err
break break
} }
} }
return acmeHTTPHandler
managerFactory := service.NewManagerFactory(*staticConfiguration, routinesPool, metricsRegistry, roundTripperManager, acmeHTTPHandler)
client, plgs, devPlugin, err := initPlugins(staticConfiguration)
if err != nil {
return nil, err
} }
pluginBuilder, err := plugins.NewBuilder(client, plgs, devPlugin) func getDefaultsEntrypoints(staticConfiguration *static.Configuration) []string {
if err != nil {
return nil, err
}
routerFactory := server.NewRouterFactory(*staticConfiguration, managerFactory, tlsManager, chainBuilder, pluginBuilder)
var defaultEntryPoints []string var defaultEntryPoints []string
for name, cfg := range staticConfiguration.EntryPoints { for name, cfg := range staticConfiguration.EntryPoints {
protocol, err := cfg.GetProtocol() protocol, err := cfg.GetProtocol()
@ -261,62 +338,7 @@ func setupServer(staticConfiguration *static.Configuration) (*server.Server, err
} }
sort.Strings(defaultEntryPoints) sort.Strings(defaultEntryPoints)
return defaultEntryPoints
watcher := server.NewConfigurationWatcher(
routinesPool,
providerAggregator,
time.Duration(staticConfiguration.Providers.ProvidersThrottleDuration),
defaultEntryPoints,
)
watcher.AddListener(func(conf dynamic.Configuration) {
ctx := context.Background()
tlsManager.UpdateConfigs(ctx, conf.TLS.Stores, conf.TLS.Options, conf.TLS.Certificates)
})
watcher.AddListener(func(_ dynamic.Configuration) {
metricsRegistry.ConfigReloadsCounter().Add(1)
metricsRegistry.LastConfigReloadSuccessGauge().Set(float64(time.Now().Unix()))
})
watcher.AddListener(func(conf dynamic.Configuration) {
roundTripperManager.Update(conf.HTTP.ServersTransports)
})
watcher.AddListener(switchRouter(routerFactory, serverEntryPointsTCP, serverEntryPointsUDP, aviator))
watcher.AddListener(func(conf dynamic.Configuration) {
if metricsRegistry.IsEpEnabled() || metricsRegistry.IsSvcEnabled() {
var eps []string
for key := range serverEntryPointsTCP {
eps = append(eps, key)
}
metrics.OnConfigurationUpdate(conf, eps)
}
})
watcher.AddListener(tlsChallengeProvider.ListenConfiguration)
resolverNames := map[string]struct{}{}
for _, p := range acmeProviders {
resolverNames[p.ResolverName] = struct{}{}
watcher.AddListener(p.ListenConfiguration)
}
watcher.AddListener(func(config dynamic.Configuration) {
for rtName, rt := range config.HTTP.Routers {
if rt.TLS == nil || rt.TLS.CertResolver == "" {
continue
}
if _, ok := resolverNames[rt.TLS.CertResolver]; !ok {
log.WithoutContext().Errorf("the router %s uses a non-existent resolver: %s", rtName, rt.TLS.CertResolver)
}
}
})
return server.NewServer(routinesPool, serverEntryPointsTCP, serverEntryPointsUDP, watcher, chainBuilder, accessLog), nil
} }
func switchRouter(routerFactory *server.RouterFactory, serverEntryPointsTCP server.TCPEntryPoints, serverEntryPointsUDP server.UDPEntryPoints, aviator *pilot.Pilot) func(conf dynamic.Configuration) { func switchRouter(routerFactory *server.RouterFactory, serverEntryPointsTCP server.TCPEntryPoints, serverEntryPointsUDP server.UDPEntryPoints, aviator *pilot.Pilot) func(conf dynamic.Configuration) {