fix: wait for file and internal before applying configurations
Co-authored-by: Ludovic Fernandez <ldez@users.noreply.github.com>
This commit is contained in:
parent
32500773b8
commit
dd0701dd16
5 changed files with 25 additions and 17 deletions
|
@ -256,6 +256,7 @@ func setupServer(staticConfiguration *static.Configuration) (*server.Server, err
|
||||||
providerAggregator,
|
providerAggregator,
|
||||||
time.Duration(staticConfiguration.Providers.ProvidersThrottleDuration),
|
time.Duration(staticConfiguration.Providers.ProvidersThrottleDuration),
|
||||||
getDefaultsEntrypoints(staticConfiguration),
|
getDefaultsEntrypoints(staticConfiguration),
|
||||||
|
"internal",
|
||||||
)
|
)
|
||||||
|
|
||||||
// TLS
|
// TLS
|
||||||
|
|
|
@ -119,10 +119,6 @@ func (p ProviderAggregator) Init() error {
|
||||||
|
|
||||||
// Provide calls the provide method of every providers.
|
// Provide calls the provide method of every providers.
|
||||||
func (p ProviderAggregator) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error {
|
func (p ProviderAggregator) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error {
|
||||||
if p.internalProvider != nil {
|
|
||||||
launchProvider(configurationChan, pool, p.internalProvider)
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.fileProvider != nil {
|
if p.fileProvider != nil {
|
||||||
launchProvider(configurationChan, pool, p.fileProvider)
|
launchProvider(configurationChan, pool, p.fileProvider)
|
||||||
}
|
}
|
||||||
|
@ -134,6 +130,12 @@ func (p ProviderAggregator) Provide(configurationChan chan<- dynamic.Message, po
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// internal provider must be the last because we use it to know if all the providers are loaded.
|
||||||
|
// ConfigurationWatcher will wait for this requiredProvider before applying configurations.
|
||||||
|
if p.internalProvider != nil {
|
||||||
|
launchProvider(configurationChan, pool, p.internalProvider)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,12 +32,11 @@ func TestProviderAggregator_Provide(t *testing.T) {
|
||||||
errCh <- aggregator.Provide(cfgCh, pool)
|
errCh <- aggregator.Provide(cfgCh, pool)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Make sure the internal provider is always called first, followed by the file provider.
|
// Make sure the file provider is always called first.
|
||||||
requireReceivedMessageFromProviders(t, cfgCh, []string{"internal"})
|
|
||||||
requireReceivedMessageFromProviders(t, cfgCh, []string{"file"})
|
requireReceivedMessageFromProviders(t, cfgCh, []string{"file"})
|
||||||
|
|
||||||
// Check if all providers have been called, the order doesn't matter.
|
// Check if all providers have been called, the order doesn't matter.
|
||||||
requireReceivedMessageFromProviders(t, cfgCh, []string{"salad", "tomato", "onion"})
|
requireReceivedMessageFromProviders(t, cfgCh, []string{"salad", "tomato", "onion", "internal"})
|
||||||
|
|
||||||
require.NoError(t, <-errCh)
|
require.NoError(t, <-errCh)
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ type ConfigurationWatcher struct {
|
||||||
configurationValidatedChan chan dynamic.Message
|
configurationValidatedChan chan dynamic.Message
|
||||||
providerConfigUpdateMap map[string]chan dynamic.Message
|
providerConfigUpdateMap map[string]chan dynamic.Message
|
||||||
|
|
||||||
|
requiredProvider string
|
||||||
configurationListeners []func(dynamic.Configuration)
|
configurationListeners []func(dynamic.Configuration)
|
||||||
|
|
||||||
routinesPool *safe.Pool
|
routinesPool *safe.Pool
|
||||||
|
@ -39,6 +40,7 @@ func NewConfigurationWatcher(
|
||||||
pvd provider.Provider,
|
pvd provider.Provider,
|
||||||
providersThrottleDuration time.Duration,
|
providersThrottleDuration time.Duration,
|
||||||
defaultEntryPoints []string,
|
defaultEntryPoints []string,
|
||||||
|
requiredProvider string,
|
||||||
) *ConfigurationWatcher {
|
) *ConfigurationWatcher {
|
||||||
watcher := &ConfigurationWatcher{
|
watcher := &ConfigurationWatcher{
|
||||||
provider: pvd,
|
provider: pvd,
|
||||||
|
@ -48,6 +50,7 @@ func NewConfigurationWatcher(
|
||||||
providersThrottleDuration: providersThrottleDuration,
|
providersThrottleDuration: providersThrottleDuration,
|
||||||
routinesPool: routinesPool,
|
routinesPool: routinesPool,
|
||||||
defaultEntryPoints: defaultEntryPoints,
|
defaultEntryPoints: defaultEntryPoints,
|
||||||
|
requiredProvider: requiredProvider,
|
||||||
}
|
}
|
||||||
|
|
||||||
currentConfigurations := make(dynamic.Configurations)
|
currentConfigurations := make(dynamic.Configurations)
|
||||||
|
@ -146,10 +149,13 @@ func (c *ConfigurationWatcher) loadMessage(configMsg dynamic.Message) {
|
||||||
conf := mergeConfiguration(newConfigurations, c.defaultEntryPoints)
|
conf := mergeConfiguration(newConfigurations, c.defaultEntryPoints)
|
||||||
conf = applyModel(conf)
|
conf = applyModel(conf)
|
||||||
|
|
||||||
|
// We wait for first configuration of the require provider before applying configurations.
|
||||||
|
if _, ok := newConfigurations[c.requiredProvider]; c.requiredProvider == "" || ok {
|
||||||
for _, listener := range c.configurationListeners {
|
for _, listener := range c.configurationListeners {
|
||||||
listener(conf)
|
listener(conf)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *ConfigurationWatcher) preLoadConfiguration(configMsg dynamic.Message) {
|
func (c *ConfigurationWatcher) preLoadConfiguration(configMsg dynamic.Message) {
|
||||||
logger := log.WithoutContext().WithField(log.ProviderName, configMsg.ProviderName)
|
logger := log.WithoutContext().WithField(log.ProviderName, configMsg.ProviderName)
|
||||||
|
|
|
@ -55,7 +55,7 @@ func TestNewConfigurationWatcher(t *testing.T) {
|
||||||
}},
|
}},
|
||||||
}
|
}
|
||||||
|
|
||||||
watcher := NewConfigurationWatcher(routinesPool, pvd, time.Second, []string{})
|
watcher := NewConfigurationWatcher(routinesPool, pvd, time.Second, []string{}, "")
|
||||||
|
|
||||||
run := make(chan struct{})
|
run := make(chan struct{})
|
||||||
|
|
||||||
|
@ -112,7 +112,7 @@ func TestListenProvidersThrottleProviderConfigReload(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{})
|
watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{}, "")
|
||||||
|
|
||||||
publishedConfigCount := 0
|
publishedConfigCount := 0
|
||||||
watcher.AddListener(func(_ dynamic.Configuration) {
|
watcher.AddListener(func(_ dynamic.Configuration) {
|
||||||
|
@ -136,7 +136,7 @@ func TestListenProvidersSkipsEmptyConfigs(t *testing.T) {
|
||||||
messages: []dynamic.Message{{ProviderName: "mock"}},
|
messages: []dynamic.Message{{ProviderName: "mock"}},
|
||||||
}
|
}
|
||||||
|
|
||||||
watcher := NewConfigurationWatcher(routinesPool, pvd, time.Second, []string{})
|
watcher := NewConfigurationWatcher(routinesPool, pvd, time.Second, []string{}, "")
|
||||||
watcher.AddListener(func(_ dynamic.Configuration) {
|
watcher.AddListener(func(_ dynamic.Configuration) {
|
||||||
t.Error("An empty configuration was published but it should not")
|
t.Error("An empty configuration was published but it should not")
|
||||||
})
|
})
|
||||||
|
@ -162,7 +162,7 @@ func TestListenProvidersSkipsSameConfigurationForProvider(t *testing.T) {
|
||||||
messages: []dynamic.Message{message, message},
|
messages: []dynamic.Message{message, message},
|
||||||
}
|
}
|
||||||
|
|
||||||
watcher := NewConfigurationWatcher(routinesPool, pvd, 0, []string{})
|
watcher := NewConfigurationWatcher(routinesPool, pvd, 0, []string{}, "")
|
||||||
|
|
||||||
alreadyCalled := false
|
alreadyCalled := false
|
||||||
watcher.AddListener(func(_ dynamic.Configuration) {
|
watcher.AddListener(func(_ dynamic.Configuration) {
|
||||||
|
@ -205,7 +205,7 @@ func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
watcher := NewConfigurationWatcher(routinesPool, pvd, 15*time.Millisecond, []string{"defaultEP"})
|
watcher := NewConfigurationWatcher(routinesPool, pvd, 15*time.Millisecond, []string{"defaultEP"}, "")
|
||||||
|
|
||||||
var lastConfig dynamic.Configuration
|
var lastConfig dynamic.Configuration
|
||||||
watcher.AddListener(func(conf dynamic.Configuration) {
|
watcher.AddListener(func(conf dynamic.Configuration) {
|
||||||
|
@ -260,7 +260,7 @@ func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
watcher := NewConfigurationWatcher(routinesPool, pvd, 0, []string{"defaultEP"})
|
watcher := NewConfigurationWatcher(routinesPool, pvd, 0, []string{"defaultEP"}, "")
|
||||||
|
|
||||||
var publishedProviderConfig dynamic.Configuration
|
var publishedProviderConfig dynamic.Configuration
|
||||||
|
|
||||||
|
@ -327,7 +327,7 @@ func TestPublishConfigUpdatedByProvider(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{})
|
watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{}, "")
|
||||||
|
|
||||||
publishedConfigCount := 0
|
publishedConfigCount := 0
|
||||||
watcher.AddListener(func(configuration dynamic.Configuration) {
|
watcher.AddListener(func(configuration dynamic.Configuration) {
|
||||||
|
@ -375,7 +375,7 @@ func TestPublishConfigUpdatedByConfigWatcherListener(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{})
|
watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{}, "")
|
||||||
|
|
||||||
publishedConfigCount := 0
|
publishedConfigCount := 0
|
||||||
watcher.AddListener(func(configuration dynamic.Configuration) {
|
watcher.AddListener(func(configuration dynamic.Configuration) {
|
||||||
|
|
Loading…
Reference in a new issue