fix: flaky tests on the configuration watcher

This commit is contained in:
Ludovic Fernandez 2022-11-24 16:00:06 +01:00 committed by GitHub
parent 46c266661c
commit 580e7fa774
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -58,7 +58,7 @@ func (p *mockProvider) Init() error {
func TestNewConfigurationWatcher(t *testing.T) { func TestNewConfigurationWatcher(t *testing.T) {
routinesPool := safe.NewPool(context.Background()) routinesPool := safe.NewPool(context.Background())
defer routinesPool.Stop() t.Cleanup(routinesPool.Stop)
pvd := &mockProvider{ pvd := &mockProvider{
messages: []dynamic.Message{{ messages: []dynamic.Message{{
@ -115,7 +115,6 @@ func TestNewConfigurationWatcher(t *testing.T) {
func TestWaitForRequiredProvider(t *testing.T) { func TestWaitForRequiredProvider(t *testing.T) {
routinesPool := safe.NewPool(context.Background()) routinesPool := safe.NewPool(context.Background())
defer routinesPool.Stop()
pvdAggregator := &mockProvider{ pvdAggregator := &mockProvider{
wait: 5 * time.Millisecond, wait: 5 * time.Millisecond,
@ -151,7 +150,9 @@ func TestWaitForRequiredProvider(t *testing.T) {
}) })
watcher.Start() watcher.Start()
defer watcher.Stop()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// give some time so that the configuration can be processed // give some time so that the configuration can be processed
time.Sleep(20 * time.Millisecond) time.Sleep(20 * time.Millisecond)
@ -162,7 +163,6 @@ func TestWaitForRequiredProvider(t *testing.T) {
func TestIgnoreTransientConfiguration(t *testing.T) { func TestIgnoreTransientConfiguration(t *testing.T) {
routinesPool := safe.NewPool(context.Background()) routinesPool := safe.NewPool(context.Background())
defer routinesPool.Stop()
config := &dynamic.Configuration{ config := &dynamic.Configuration{
HTTP: th.BuildConfiguration( HTTP: th.BuildConfiguration(
@ -190,7 +190,9 @@ func TestIgnoreTransientConfiguration(t *testing.T) {
}) })
watcher.Start() watcher.Start()
defer watcher.Stop()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
watcher.allProvidersConfigs <- dynamic.Message{ watcher.allProvidersConfigs <- dynamic.Message{
ProviderName: "mock", ProviderName: "mock",
@ -243,7 +245,6 @@ func TestIgnoreTransientConfiguration(t *testing.T) {
func TestListenProvidersThrottleProviderConfigReload(t *testing.T) { func TestListenProvidersThrottleProviderConfigReload(t *testing.T) {
routinesPool := safe.NewPool(context.Background()) routinesPool := safe.NewPool(context.Background())
defer routinesPool.Stop()
pvd := &mockProvider{ pvd := &mockProvider{
wait: 10 * time.Millisecond, wait: 10 * time.Millisecond,
@ -274,7 +275,9 @@ func TestListenProvidersThrottleProviderConfigReload(t *testing.T) {
}) })
watcher.Start() watcher.Start()
defer watcher.Stop()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// Give some time so that the configuration can be processed. // Give some time so that the configuration can be processed.
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
@ -287,7 +290,6 @@ func TestListenProvidersThrottleProviderConfigReload(t *testing.T) {
func TestListenProvidersSkipsEmptyConfigs(t *testing.T) { func TestListenProvidersSkipsEmptyConfigs(t *testing.T) {
routinesPool := safe.NewPool(context.Background()) routinesPool := safe.NewPool(context.Background())
defer routinesPool.Stop()
pvd := &mockProvider{ pvd := &mockProvider{
messages: []dynamic.Message{{ProviderName: "mock"}}, messages: []dynamic.Message{{ProviderName: "mock"}},
@ -299,7 +301,9 @@ func TestListenProvidersSkipsEmptyConfigs(t *testing.T) {
}) })
watcher.Start() watcher.Start()
defer watcher.Stop()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// give some time so that the configuration can be processed // give some time so that the configuration can be processed
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
@ -307,7 +311,6 @@ func TestListenProvidersSkipsEmptyConfigs(t *testing.T) {
func TestListenProvidersSkipsSameConfigurationForProvider(t *testing.T) { func TestListenProvidersSkipsSameConfigurationForProvider(t *testing.T) {
routinesPool := safe.NewPool(context.Background()) routinesPool := safe.NewPool(context.Background())
defer routinesPool.Stop()
message := dynamic.Message{ message := dynamic.Message{
ProviderName: "mock", ProviderName: "mock",
@ -331,7 +334,9 @@ func TestListenProvidersSkipsSameConfigurationForProvider(t *testing.T) {
}) })
watcher.Start() watcher.Start()
defer watcher.Stop()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// give some time so that the configuration can be processed // give some time so that the configuration can be processed
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
@ -340,7 +345,6 @@ func TestListenProvidersSkipsSameConfigurationForProvider(t *testing.T) {
func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) { func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) {
routinesPool := safe.NewPool(context.Background()) routinesPool := safe.NewPool(context.Background())
defer routinesPool.Stop()
configuration := &dynamic.Configuration{ configuration := &dynamic.Configuration{
HTTP: th.BuildConfiguration( HTTP: th.BuildConfiguration(
@ -374,7 +378,9 @@ func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) {
}) })
watcher.Start() watcher.Start()
defer watcher.Stop()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// give some time so that the configuration can be processed // give some time so that the configuration can be processed
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
@ -407,7 +413,6 @@ func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) {
func TestListenProvidersIgnoreSameConfig(t *testing.T) { func TestListenProvidersIgnoreSameConfig(t *testing.T) {
routinesPool := safe.NewPool(context.Background()) routinesPool := safe.NewPool(context.Background())
defer routinesPool.Stop()
configuration := &dynamic.Configuration{ configuration := &dynamic.Configuration{
HTTP: th.BuildConfiguration( HTTP: th.BuildConfiguration(
@ -453,8 +458,7 @@ func TestListenProvidersIgnoreSameConfig(t *testing.T) {
configurationReloads++ configurationReloads++
lastConfig = conf lastConfig = conf
// Allows next configurations to be sent by the mock provider // Allows next configurations to be sent by the mock provider as soon as the first configuration message is applied.
// as soon as the first configuration message is applied.
once.Do(func() { once.Do(func() {
pvd.first <- struct{}{} pvd.first <- struct{}{}
// Wait for all configuration messages to pile in // Wait for all configuration messages to pile in
@ -463,7 +467,9 @@ func TestListenProvidersIgnoreSameConfig(t *testing.T) {
}) })
watcher.Start() watcher.Start()
defer watcher.Stop()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// Wait long enough // Wait long enough
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
@ -498,7 +504,6 @@ func TestListenProvidersIgnoreSameConfig(t *testing.T) {
func TestApplyConfigUnderStress(t *testing.T) { func TestApplyConfigUnderStress(t *testing.T) {
routinesPool := safe.NewPool(context.Background()) routinesPool := safe.NewPool(context.Background())
defer routinesPool.Stop()
watcher := NewConfigurationWatcher(routinesPool, &mockProvider{}, []string{"defaultEP"}, "") watcher := NewConfigurationWatcher(routinesPool, &mockProvider{}, []string{"defaultEP"}, "")
@ -525,15 +530,16 @@ func TestApplyConfigUnderStress(t *testing.T) {
}) })
watcher.Start() watcher.Start()
defer watcher.Stop()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
// Ensure that at least two configurations have been applied // Ensure that at least two configurations have been applied
// if we simulate being spammed configuration changes by the // if we simulate being spammed configuration changes by the provider(s).
// provider(s). // In theory, checking at least one would be sufficient,
// In theory, checking at least one would be sufficient, but // but checking for two also ensures that we're looping properly,
// checking for two also ensures that we're looping properly,
// and that the whole algo holds, etc. // and that the whole algo holds, etc.
t.Log(configurationReloads) t.Log(configurationReloads)
assert.GreaterOrEqual(t, configurationReloads, 2) assert.GreaterOrEqual(t, configurationReloads, 2)
@ -541,7 +547,6 @@ func TestApplyConfigUnderStress(t *testing.T) {
func TestListenProvidersIgnoreIntermediateConfigs(t *testing.T) { func TestListenProvidersIgnoreIntermediateConfigs(t *testing.T) {
routinesPool := safe.NewPool(context.Background()) routinesPool := safe.NewPool(context.Background())
defer routinesPool.Stop()
configuration := &dynamic.Configuration{ configuration := &dynamic.Configuration{
HTTP: th.BuildConfiguration( HTTP: th.BuildConfiguration(
@ -596,7 +601,9 @@ func TestListenProvidersIgnoreIntermediateConfigs(t *testing.T) {
}) })
watcher.Start() watcher.Start()
defer watcher.Stop()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// Wait long enough // Wait long enough
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
@ -631,7 +638,6 @@ func TestListenProvidersIgnoreIntermediateConfigs(t *testing.T) {
func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) { func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) {
routinesPool := safe.NewPool(context.Background()) routinesPool := safe.NewPool(context.Background())
defer routinesPool.Stop()
configuration := &dynamic.Configuration{ configuration := &dynamic.Configuration{
HTTP: th.BuildConfiguration( HTTP: th.BuildConfiguration(
@ -656,7 +662,9 @@ func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) {
}) })
watcher.Start() watcher.Start()
defer watcher.Stop()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// give some time so that the configuration can be processed // give some time so that the configuration can be processed
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
@ -695,7 +703,6 @@ func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) {
func TestPublishConfigUpdatedByProvider(t *testing.T) { func TestPublishConfigUpdatedByProvider(t *testing.T) {
routinesPool := safe.NewPool(context.Background()) routinesPool := safe.NewPool(context.Background())
defer routinesPool.Stop()
pvdConfiguration := dynamic.Configuration{ pvdConfiguration := dynamic.Configuration{
TCP: &dynamic.TCPConfiguration{ TCP: &dynamic.TCPConfiguration{
@ -725,12 +732,14 @@ func TestPublishConfigUpdatedByProvider(t *testing.T) {
watcher.AddListener(func(configuration dynamic.Configuration) { watcher.AddListener(func(configuration dynamic.Configuration) {
publishedConfigCount++ publishedConfigCount++
// Update the provider configuration published in next dynamic Message which should trigger a new publish. // Update the provider configuration published in next dynamic Message which should trigger a new publishing.
pvdConfiguration.TCP.Routers["bar"] = &dynamic.TCPRouter{} pvdConfiguration.TCP.Routers["bar"] = &dynamic.TCPRouter{}
}) })
watcher.Start() watcher.Start()
defer watcher.Stop()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// give some time so that the configuration can be processed. // give some time so that the configuration can be processed.
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
@ -740,7 +749,6 @@ func TestPublishConfigUpdatedByProvider(t *testing.T) {
func TestPublishConfigUpdatedByConfigWatcherListener(t *testing.T) { func TestPublishConfigUpdatedByConfigWatcherListener(t *testing.T) {
routinesPool := safe.NewPool(context.Background()) routinesPool := safe.NewPool(context.Background())
defer routinesPool.Stop()
pvd := &mockProvider{ pvd := &mockProvider{
wait: 10 * time.Millisecond, wait: 10 * time.Millisecond,
@ -774,13 +782,15 @@ func TestPublishConfigUpdatedByConfigWatcherListener(t *testing.T) {
watcher.AddListener(func(configuration dynamic.Configuration) { watcher.AddListener(func(configuration dynamic.Configuration) {
publishedConfigCount++ publishedConfigCount++
// Modify the provided configuration. This should not modify the configuration stored in the configuration // Modify the provided configuration.
// watcher and cause a new publish. // This should not modify the configuration stored in the configuration watcher and therefore there will be no new publishing.
configuration.TCP.Routers["foo@mock"].Rule = "bar" configuration.TCP.Routers["foo@mock"].Rule = "bar"
}) })
watcher.Start() watcher.Start()
defer watcher.Stop()
t.Cleanup(watcher.Stop)
t.Cleanup(routinesPool.Stop)
// give some time so that the configuration can be processed. // give some time so that the configuration can be processed.
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)