206 lines
5.4 KiB
Go
206 lines
5.4 KiB
Go
package aggregator
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
"github.com/traefik/traefik/v3/pkg/config/dynamic"
|
|
"github.com/traefik/traefik/v3/pkg/config/static"
|
|
"github.com/traefik/traefik/v3/pkg/provider"
|
|
"github.com/traefik/traefik/v3/pkg/provider/file"
|
|
"github.com/traefik/traefik/v3/pkg/provider/traefik"
|
|
"github.com/traefik/traefik/v3/pkg/redactor"
|
|
"github.com/traefik/traefik/v3/pkg/safe"
|
|
)
|
|
|
|
// throttled defines what kind of config refresh throttling the aggregator should
|
|
// set up for a given provider.
|
|
// If a provider implements throttled, the configuration changes it sends will be
|
|
// taken into account no more often than the frequency inferred from ThrottleDuration().
|
|
// If ThrottleDuration returns zero, no throttling will take place.
|
|
// If throttled is not implemented, the throttling will be set up in accordance
|
|
// with the global providersThrottleDuration option.
|
|
type throttled interface {
|
|
ThrottleDuration() time.Duration
|
|
}
|
|
|
|
// maybeThrottledProvide returns the Provide method of the given provider,
|
|
// potentially augmented with some throttling depending on whether and how the
|
|
// provider implements the throttled interface.
|
|
func maybeThrottledProvide(prd provider.Provider, defaultDuration time.Duration) func(chan<- dynamic.Message, *safe.Pool) error {
|
|
providerThrottleDuration := defaultDuration
|
|
if throttled, ok := prd.(throttled); ok {
|
|
// per-provider throttling
|
|
providerThrottleDuration = throttled.ThrottleDuration()
|
|
}
|
|
|
|
if providerThrottleDuration == 0 {
|
|
// throttling disabled
|
|
return prd.Provide
|
|
}
|
|
|
|
return func(configurationChan chan<- dynamic.Message, pool *safe.Pool) error {
|
|
rc := newRingChannel()
|
|
pool.GoCtx(func(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case msg := <-rc.out():
|
|
configurationChan <- msg
|
|
time.Sleep(providerThrottleDuration)
|
|
}
|
|
}
|
|
})
|
|
|
|
return prd.Provide(rc.in(), pool)
|
|
}
|
|
}
|
|
|
|
// ProviderAggregator aggregates providers.
|
|
type ProviderAggregator struct {
|
|
internalProvider provider.Provider
|
|
fileProvider provider.Provider
|
|
providers []provider.Provider
|
|
providersThrottleDuration time.Duration
|
|
}
|
|
|
|
// NewProviderAggregator returns an aggregate of all the providers configured in the static configuration.
|
|
func NewProviderAggregator(conf static.Providers) ProviderAggregator {
|
|
p := ProviderAggregator{
|
|
providersThrottleDuration: time.Duration(conf.ProvidersThrottleDuration),
|
|
}
|
|
|
|
if conf.File != nil {
|
|
p.quietAddProvider(conf.File)
|
|
}
|
|
|
|
if conf.Docker != nil {
|
|
p.quietAddProvider(conf.Docker)
|
|
}
|
|
|
|
if conf.Rest != nil {
|
|
p.quietAddProvider(conf.Rest)
|
|
}
|
|
|
|
if conf.KubernetesIngress != nil {
|
|
p.quietAddProvider(conf.KubernetesIngress)
|
|
}
|
|
|
|
if conf.KubernetesCRD != nil {
|
|
p.quietAddProvider(conf.KubernetesCRD)
|
|
}
|
|
|
|
if conf.KubernetesGateway != nil {
|
|
p.quietAddProvider(conf.KubernetesGateway)
|
|
}
|
|
|
|
if conf.Ecs != nil {
|
|
p.quietAddProvider(conf.Ecs)
|
|
}
|
|
|
|
if conf.ConsulCatalog != nil {
|
|
for _, pvd := range conf.ConsulCatalog.BuildProviders() {
|
|
p.quietAddProvider(pvd)
|
|
}
|
|
}
|
|
|
|
if conf.Nomad != nil {
|
|
for _, pvd := range conf.Nomad.BuildProviders() {
|
|
p.quietAddProvider(pvd)
|
|
}
|
|
}
|
|
|
|
if conf.Consul != nil {
|
|
for _, pvd := range conf.Consul.BuildProviders() {
|
|
p.quietAddProvider(pvd)
|
|
}
|
|
}
|
|
|
|
if conf.Etcd != nil {
|
|
p.quietAddProvider(conf.Etcd)
|
|
}
|
|
|
|
if conf.ZooKeeper != nil {
|
|
p.quietAddProvider(conf.ZooKeeper)
|
|
}
|
|
|
|
if conf.Redis != nil {
|
|
p.quietAddProvider(conf.Redis)
|
|
}
|
|
|
|
if conf.HTTP != nil {
|
|
p.quietAddProvider(conf.HTTP)
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
func (p *ProviderAggregator) quietAddProvider(provider provider.Provider) {
|
|
err := p.AddProvider(provider)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("Error while initializing provider %T", provider)
|
|
}
|
|
}
|
|
|
|
// AddProvider adds a provider in the providers map.
|
|
func (p *ProviderAggregator) AddProvider(provider provider.Provider) error {
|
|
err := provider.Init()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
switch provider.(type) {
|
|
case *file.Provider:
|
|
p.fileProvider = provider
|
|
case *traefik.Provider:
|
|
p.internalProvider = provider
|
|
default:
|
|
p.providers = append(p.providers, provider)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Init the provider.
|
|
func (p ProviderAggregator) Init() error {
|
|
return nil
|
|
}
|
|
|
|
// Provide calls the provide method of every providers.
|
|
func (p ProviderAggregator) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error {
|
|
if p.fileProvider != nil {
|
|
p.launchProvider(configurationChan, pool, p.fileProvider)
|
|
}
|
|
|
|
for _, prd := range p.providers {
|
|
prd := prd
|
|
safe.Go(func() {
|
|
p.launchProvider(configurationChan, pool, prd)
|
|
})
|
|
}
|
|
|
|
// internal provider must be the last because we use it to know if all the providers are loaded.
|
|
// ConfigurationWatcher will wait for this requiredProvider before applying configurations.
|
|
if p.internalProvider != nil {
|
|
p.launchProvider(configurationChan, pool, p.internalProvider)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p ProviderAggregator) launchProvider(configurationChan chan<- dynamic.Message, pool *safe.Pool, prd provider.Provider) {
|
|
jsonConf, err := redactor.RemoveCredentials(prd)
|
|
if err != nil {
|
|
log.Debug().Err(err).Msgf("Cannot marshal the provider configuration %T", prd)
|
|
}
|
|
|
|
log.Info().Msgf("Starting provider %T", prd)
|
|
log.Debug().RawJSON("config", []byte(jsonConf)).Msgf("%T provider configuration", prd)
|
|
|
|
if err := maybeThrottledProvide(prd, p.providersThrottleDuration)(configurationChan, pool); err != nil {
|
|
log.Error().Err(err).Msgf("Cannot start the provider %T", prd)
|
|
return
|
|
}
|
|
}
|