Fix race condition issues with provided dynamic configuration
* tests: add tests to show race condition on provider config * fix: store a deep copy of previous provider config * fix: send a deep copy of provdier config to watcher listener
This commit is contained in:
parent
607cda779d
commit
4d71f682b3
2 changed files with 95 additions and 2 deletions
|
@ -224,8 +224,8 @@ func (c *ConfigurationWatcher) throttleProviderConfigReload(ctx context.Context,
|
|||
logger.Info("Skipping same configuration")
|
||||
continue
|
||||
}
|
||||
previousConfig = nextConfig
|
||||
ring.In() <- nextConfig
|
||||
previousConfig = *nextConfig.DeepCopy()
|
||||
ring.In() <- *nextConfig.DeepCopy()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -301,3 +301,96 @@ func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) {
|
|||
|
||||
assert.Equal(t, expected, publishedProviderConfig)
|
||||
}
|
||||
|
||||
func TestPublishConfigUpdatedByProvider(t *testing.T) {
|
||||
routinesPool := safe.NewPool(context.Background())
|
||||
|
||||
pvdConfiguration := dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
Routers: map[string]*dynamic.TCPRouter{
|
||||
"foo": {},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
pvd := &mockProvider{
|
||||
wait: 10 * time.Millisecond,
|
||||
messages: []dynamic.Message{
|
||||
{
|
||||
ProviderName: "mock",
|
||||
Configuration: &pvdConfiguration,
|
||||
},
|
||||
{
|
||||
ProviderName: "mock",
|
||||
Configuration: &pvdConfiguration,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{})
|
||||
|
||||
publishedConfigCount := 0
|
||||
watcher.AddListener(func(configuration dynamic.Configuration) {
|
||||
publishedConfigCount++
|
||||
|
||||
// Update the provider configuration published in next dynamic Message which should trigger a new publish.
|
||||
pvdConfiguration.TCP.Routers["bar"] = &dynamic.TCPRouter{}
|
||||
})
|
||||
|
||||
watcher.Start()
|
||||
defer watcher.Stop()
|
||||
|
||||
// give some time so that the configuration can be processed.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
assert.Equal(t, 2, publishedConfigCount)
|
||||
}
|
||||
|
||||
func TestPublishConfigUpdatedByConfigWatcherListener(t *testing.T) {
|
||||
routinesPool := safe.NewPool(context.Background())
|
||||
|
||||
pvd := &mockProvider{
|
||||
wait: 10 * time.Millisecond,
|
||||
messages: []dynamic.Message{
|
||||
{
|
||||
ProviderName: "mock",
|
||||
Configuration: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
Routers: map[string]*dynamic.TCPRouter{
|
||||
"foo": {},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ProviderName: "mock",
|
||||
Configuration: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
Routers: map[string]*dynamic.TCPRouter{
|
||||
"foo": {},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{})
|
||||
|
||||
publishedConfigCount := 0
|
||||
watcher.AddListener(func(configuration dynamic.Configuration) {
|
||||
publishedConfigCount++
|
||||
|
||||
// Modify the provided configuration. This should not modify the configuration stored in the configuration
|
||||
// watcher and cause a new publish.
|
||||
configuration.TCP.Routers["foo@mock"].Rule = "bar"
|
||||
})
|
||||
|
||||
watcher.Start()
|
||||
defer watcher.Stop()
|
||||
|
||||
// give some time so that the configuration can be processed.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
assert.Equal(t, 1, publishedConfigCount)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue