diff --git a/docs/content/providers/nomad.md b/docs/content/providers/nomad.md index 62e973fb0..0bd1ca706 100644 --- a/docs/content/providers/nomad.md +++ b/docs/content/providers/nomad.md @@ -56,6 +56,8 @@ _Optional, Default=15s_ Defines the polling interval. +!!! note "This option is ignored when the [watch](#watch) mode is enabled." + ```yaml tab="File (YAML)" providers: nomad: @@ -74,6 +76,62 @@ providers: # ... ``` +### `watch` + +_Optional, Default=false_ + +Enables the watch mode to refresh the configuration on a per-event basis. + +```yaml tab="File (YAML)" +providers: + nomad: + watch: true + # ... +``` + +```toml tab="File (TOML)" +[providers.nomad] + watch = true + # ... +``` + +```bash tab="CLI" +--providers.nomad.watch +# ... +``` + +### `throttleDuration` + +_Optional, Default=0s_ + +The `throttleDuration` option defines how often the provider is allowed to handle service events from Nomad. +This prevents a Nomad cluster that updates many times per second from continuously changing your Traefik configuration. + +If left empty, the provider does not apply any throttling and does not drop any Nomad service events. + +The value of `throttleDuration` should be provided in seconds or as a valid duration format, +see [time.ParseDuration](https://golang.org/pkg/time/#ParseDuration). + +!!! warning "This option is only compatible with the [watch](#watch) mode." + +```yaml tab="File (YAML)" +providers: + nomad: + throttleDuration: 2s + # ... +``` + +```toml tab="File (TOML)" +[providers.nomad] + throttleDuration = "2s" + # ... +``` + +```bash tab="CLI" +--providers.nomad.throttleDuration=2s +# ... +``` + ### `prefix` _required, Default="traefik"_ diff --git a/docs/content/reference/static-configuration/cli-ref.md b/docs/content/reference/static-configuration/cli-ref.md index 3d6e02908..b0045c025 100644 --- a/docs/content/reference/static-configuration/cli-ref.md +++ b/docs/content/reference/static-configuration/cli-ref.md @@ -918,6 +918,12 @@ Interval for polling Nomad API. (Default: ```15```) `--providers.nomad.stale`: Use stale consistency for catalog reads. (Default: ```false```) +`--providers.nomad.throttleduration`: +Watch throttle duration. (Default: ```0```) + +`--providers.nomad.watch`: +Watch Nomad Service events. (Default: ```false```) + `--providers.plugin.`: Plugins configuration. diff --git a/docs/content/reference/static-configuration/env-ref.md b/docs/content/reference/static-configuration/env-ref.md index 058190eaf..1306419fc 100644 --- a/docs/content/reference/static-configuration/env-ref.md +++ b/docs/content/reference/static-configuration/env-ref.md @@ -918,6 +918,12 @@ Interval for polling Nomad API. (Default: ```15```) `TRAEFIK_PROVIDERS_NOMAD_STALE`: Use stale consistency for catalog reads. (Default: ```false```) +`TRAEFIK_PROVIDERS_NOMAD_THROTTLEDURATION`: +Watch throttle duration. (Default: ```0```) + +`TRAEFIK_PROVIDERS_NOMAD_WATCH`: +Watch Nomad Service events. (Default: ```false```) + `TRAEFIK_PROVIDERS_PLUGIN_`: Plugins configuration. diff --git a/docs/content/reference/static-configuration/file.toml b/docs/content/reference/static-configuration/file.toml index dd13bbc0a..94fa46f6d 100644 --- a/docs/content/reference/static-configuration/file.toml +++ b/docs/content/reference/static-configuration/file.toml @@ -203,6 +203,8 @@ exposedByDefault = true refreshInterval = "42s" allowEmptyServices = true + watch = true + throttleDuration = "42s" namespaces = ["foobar", "foobar"] [providers.nomad.endpoint] address = "foobar" diff --git a/docs/content/reference/static-configuration/file.yaml b/docs/content/reference/static-configuration/file.yaml index 89ee83476..9be48940b 100644 --- a/docs/content/reference/static-configuration/file.yaml +++ b/docs/content/reference/static-configuration/file.yaml @@ -236,6 +236,8 @@ providers: exposedByDefault: true refreshInterval: 42s allowEmptyServices: true + watch: true + throttleDuration: 42s namespaces: - foobar - foobar diff --git a/pkg/provider/nomad/nomad.go b/pkg/provider/nomad/nomad.go index 5a595dca6..6f0b4774e 100644 --- a/pkg/provider/nomad/nomad.go +++ b/pkg/provider/nomad/nomad.go @@ -10,6 +10,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/hashicorp/nomad/api" + "github.com/mitchellh/hashstructure" "github.com/rs/zerolog/log" ptypes "github.com/traefik/paerser/types" "github.com/traefik/traefik/v3/pkg/config/dynamic" @@ -93,6 +94,8 @@ type Configuration struct { ExposedByDefault bool `description:"Expose Nomad services by default." json:"exposedByDefault,omitempty" toml:"exposedByDefault,omitempty" yaml:"exposedByDefault,omitempty" export:"true"` RefreshInterval ptypes.Duration `description:"Interval for polling Nomad API." json:"refreshInterval,omitempty" toml:"refreshInterval,omitempty" yaml:"refreshInterval,omitempty" export:"true"` AllowEmptyServices bool `description:"Allow the creation of services without endpoints." json:"allowEmptyServices,omitempty" toml:"allowEmptyServices,omitempty" yaml:"allowEmptyServices,omitempty" export:"true"` + Watch bool `description:"Watch Nomad Service events." json:"watch,omitempty" toml:"watch,omitempty" yaml:"watch,omitempty" export:"true"` + ThrottleDuration ptypes.Duration `description:"Watch throttle duration." json:"throttleDuration,omitempty" toml:"throttleDuration,omitempty" yaml:"throttleDuration,omitempty" export:"true"` } // SetDefaults sets the default values for the Nomad Traefik Provider Configuration. @@ -117,7 +120,7 @@ func (c *Configuration) SetDefaults() { c.ExposedByDefault = true c.RefreshInterval = ptypes.Duration(15 * time.Second) c.DefaultRule = defaultTemplateRule - c.AllowEmptyServices = false + c.ThrottleDuration = ptypes.Duration(0) } type EndpointConfig struct { @@ -139,6 +142,8 @@ type Provider struct { namespace string client *api.Client // client for Nomad API defaultRuleTpl *template.Template // default routing rule + + lastConfiguration safe.Safe } // SetDefaults sets the default values for the Nomad Traefik Provider. @@ -152,6 +157,10 @@ func (p *Provider) Init() error { return errors.New("wildcard namespace not supported") } + if p.ThrottleDuration > 0 && !p.Watch { + return errors.New("throttle duration should not be used with polling mode") + } + defaultRuleTpl, err := provider.MakeDefaultRuleTemplate(p.DefaultRule, nil) if err != nil { return fmt.Errorf("error while parsing default rule: %w", err) @@ -183,32 +192,63 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe. ctx, cancel := context.WithCancel(ctxLog) defer cancel() - // load initial configuration - if err := p.loadConfiguration(ctx, configurationChan); err != nil { - return fmt.Errorf("failed to load initial nomad services: %w", err) + serviceEventsChan, err := p.pollOrWatch(ctx) + if err != nil { + return fmt.Errorf("watching Nomad events: %w", err) } - // issue periodic refreshes in the background - // (Nomad does not support Watch style observations) - ticker := time.NewTicker(time.Duration(p.RefreshInterval)) - defer ticker.Stop() + throttleDuration := time.Duration(p.ThrottleDuration) + throttledChan := throttleEvents(ctx, throttleDuration, pool, serviceEventsChan) + if throttledChan != nil { + serviceEventsChan = throttledChan + } + + conf, err := p.loadConfiguration(ctx) + if err != nil { + return fmt.Errorf("loading configuration: %w", err) + } + if _, err := p.updateLastConfiguration(conf); err != nil { + return fmt.Errorf("updating last configuration: %w", err) + } + + configurationChan <- dynamic.Message{ + ProviderName: p.name, + Configuration: conf, + } - // enter loop where we wait for and respond to notifications for { select { case <-ctx.Done(): return nil - case <-ticker.C: - } - // load services due to refresh - if err := p.loadConfiguration(ctx, configurationChan); err != nil { - return fmt.Errorf("failed to refresh nomad services: %w", err) + case event := <-serviceEventsChan: + conf, err = p.loadConfiguration(ctx) + if err != nil { + return fmt.Errorf("loading configuration: %w", err) + } + updated, err := p.updateLastConfiguration(conf) + if err != nil { + return fmt.Errorf("updating last configuration: %w", err) + } + if !updated { + logger.Debug().Msgf("Skipping Nomad event %d with no changes", event.Index) + continue + } + + configurationChan <- dynamic.Message{ + ProviderName: p.name, + Configuration: conf, + } + + // If we're throttling, we sleep here for the throttle duration to + // enforce that we don't refresh faster than our throttle. time.Sleep + // returns immediately if p.ThrottleDuration is 0 (no throttle). + time.Sleep(throttleDuration) } } } failure := func(err error, d time.Duration) { - logger.Error().Err(err).Msgf("Provider connection error, retrying in %s", d) + logger.Error().Err(err).Msgf("Loading configuration, retrying in %s", d) } if retryErr := backoff.RetryNotify( @@ -223,27 +263,70 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe. return nil } -func (p *Provider) loadConfiguration(ctx context.Context, configurationC chan<- dynamic.Message) error { +func (p *Provider) pollOrWatch(ctx context.Context) (<-chan *api.Events, error) { + if p.Watch { + return p.client.EventStream().Stream(ctx, + map[api.Topic][]string{ + api.TopicService: {"*"}, + }, + 0, + &api.QueryOptions{ + Namespace: p.namespace, + }, + ) + } + + serviceEventsChan := make(chan *api.Events, 1) + + go func() { + ticker := time.NewTicker(time.Duration(p.RefreshInterval)) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case t := <-ticker.C: + serviceEventsChan <- &api.Events{ + Index: uint64(t.UnixNano()), + } + } + } + }() + + return serviceEventsChan, nil +} + +func (p *Provider) loadConfiguration(ctx context.Context) (*dynamic.Configuration, error) { var items []item var err error if p.AllowEmptyServices { items, err = p.getNomadServiceDataWithEmptyServices(ctx) if err != nil { - return err + return nil, err } } else { items, err = p.getNomadServiceData(ctx) if err != nil { - return err + return nil, err } } - configurationC <- dynamic.Message{ - ProviderName: p.name, - Configuration: p.buildConfig(ctx, items), + return p.buildConfig(ctx, items), nil +} + +func (p *Provider) updateLastConfiguration(conf *dynamic.Configuration) (bool, error) { + confHash, err := hashstructure.Hash(conf, nil) + if err != nil { + return false, fmt.Errorf("hashing the configuration: %w", err) } - return nil + if p.lastConfiguration.Get() == confHash { + return false, nil + } + + p.lastConfiguration.Set(confHash) + return true, nil } func (p *Provider) getNomadServiceData(ctx context.Context) ([]item, error) { @@ -453,3 +536,38 @@ func createClient(namespace string, endpoint *EndpointConfig) (*api.Client, erro return api.NewClient(&config) } + +// Copied from the Kubernetes provider. +func throttleEvents(ctx context.Context, throttleDuration time.Duration, pool *safe.Pool, eventsChan <-chan *api.Events) chan *api.Events { + if throttleDuration == 0 { + return nil + } + + // Create a buffered channel to hold the pending event (if we're delaying processing the event due to throttling). + eventsChanBuffered := make(chan *api.Events, 1) + + // Run a goroutine that reads events from eventChan and does a + // non-blocking write to pendingEvent. This guarantees that writing to + // eventChan will never block, and that pendingEvent will have + // something in it if there's been an event since we read from that channel. + pool.GoCtx(func(ctxPool context.Context) { + for { + select { + case <-ctxPool.Done(): + return + case nextEvent := <-eventsChan: + select { + case eventsChanBuffered <- nextEvent: + default: + // We already have an event in eventsChanBuffered, so we'll + // do a refresh as soon as our throttle allows us to. It's fine + // to drop the event and keep whatever's in the buffer -- we + // don't do different things for different events. + log.Ctx(ctx).Debug().Msgf("Dropping event %d due to throttling", nextEvent.Index) + } + } + } + }) + + return eventsChanBuffered +}