From a8df674dcf77909e6e0f9c0452abf7a7c6857571 Mon Sep 17 00:00:00 2001 From: Ludovic Fernandez Date: Wed, 7 Dec 2022 10:56:05 +0100 Subject: [PATCH] fix: flaky tests --- pkg/server/configurationwatcher_test.go | 78 +++++++++++++----------- pkg/server/server_entrypoint_tcp_test.go | 22 +++---- pkg/server/server_entrypoint_udp_test.go | 32 +++++----- 3 files changed, 71 insertions(+), 61 deletions(-) diff --git a/pkg/server/configurationwatcher_test.go b/pkg/server/configurationwatcher_test.go index afd886e84..da8d39c4d 100644 --- a/pkg/server/configurationwatcher_test.go +++ b/pkg/server/configurationwatcher_test.go @@ -58,7 +58,7 @@ func (p *mockProvider) Init() error { func TestNewConfigurationWatcher(t *testing.T) { routinesPool := safe.NewPool(context.Background()) - defer routinesPool.Stop() + t.Cleanup(routinesPool.Stop) pvd := &mockProvider{ messages: []dynamic.Message{{ @@ -115,7 +115,6 @@ func TestNewConfigurationWatcher(t *testing.T) { func TestWaitForRequiredProvider(t *testing.T) { routinesPool := safe.NewPool(context.Background()) - defer routinesPool.Stop() pvdAggregator := &mockProvider{ wait: 5 * time.Millisecond, @@ -151,7 +150,9 @@ func TestWaitForRequiredProvider(t *testing.T) { }) watcher.Start() - defer watcher.Stop() + + t.Cleanup(watcher.Stop) + t.Cleanup(routinesPool.Stop) // give some time so that the configuration can be processed time.Sleep(20 * time.Millisecond) @@ -162,7 +163,6 @@ func TestWaitForRequiredProvider(t *testing.T) { func TestIgnoreTransientConfiguration(t *testing.T) { routinesPool := safe.NewPool(context.Background()) - defer routinesPool.Stop() config := &dynamic.Configuration{ HTTP: th.BuildConfiguration( @@ -190,7 +190,9 @@ func TestIgnoreTransientConfiguration(t *testing.T) { }) watcher.Start() - defer watcher.Stop() + + t.Cleanup(watcher.Stop) + t.Cleanup(routinesPool.Stop) watcher.allProvidersConfigs <- dynamic.Message{ ProviderName: "mock", @@ -243,7 +245,6 @@ func TestIgnoreTransientConfiguration(t *testing.T) { func TestListenProvidersThrottleProviderConfigReload(t *testing.T) { routinesPool := safe.NewPool(context.Background()) - defer routinesPool.Stop() pvd := &mockProvider{ wait: 10 * time.Millisecond, @@ -274,7 +275,9 @@ func TestListenProvidersThrottleProviderConfigReload(t *testing.T) { }) watcher.Start() - defer watcher.Stop() + + t.Cleanup(watcher.Stop) + t.Cleanup(routinesPool.Stop) // Give some time so that the configuration can be processed. time.Sleep(100 * time.Millisecond) @@ -287,7 +290,6 @@ func TestListenProvidersThrottleProviderConfigReload(t *testing.T) { func TestListenProvidersSkipsEmptyConfigs(t *testing.T) { routinesPool := safe.NewPool(context.Background()) - defer routinesPool.Stop() pvd := &mockProvider{ messages: []dynamic.Message{{ProviderName: "mock"}}, @@ -299,7 +301,9 @@ func TestListenProvidersSkipsEmptyConfigs(t *testing.T) { }) watcher.Start() - defer watcher.Stop() + + t.Cleanup(watcher.Stop) + t.Cleanup(routinesPool.Stop) // give some time so that the configuration can be processed time.Sleep(100 * time.Millisecond) @@ -307,7 +311,6 @@ func TestListenProvidersSkipsEmptyConfigs(t *testing.T) { func TestListenProvidersSkipsSameConfigurationForProvider(t *testing.T) { routinesPool := safe.NewPool(context.Background()) - defer routinesPool.Stop() message := dynamic.Message{ ProviderName: "mock", @@ -331,7 +334,9 @@ func TestListenProvidersSkipsSameConfigurationForProvider(t *testing.T) { }) watcher.Start() - defer watcher.Stop() + + t.Cleanup(watcher.Stop) + t.Cleanup(routinesPool.Stop) // give some time so that the configuration can be processed time.Sleep(100 * time.Millisecond) @@ -340,7 +345,6 @@ func TestListenProvidersSkipsSameConfigurationForProvider(t *testing.T) { func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) { routinesPool := safe.NewPool(context.Background()) - defer routinesPool.Stop() configuration := &dynamic.Configuration{ HTTP: th.BuildConfiguration( @@ -374,7 +378,9 @@ func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) { }) watcher.Start() - defer watcher.Stop() + + t.Cleanup(watcher.Stop) + t.Cleanup(routinesPool.Stop) // give some time so that the configuration can be processed time.Sleep(100 * time.Millisecond) @@ -407,7 +413,6 @@ func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) { func TestListenProvidersIgnoreSameConfig(t *testing.T) { routinesPool := safe.NewPool(context.Background()) - defer routinesPool.Stop() configuration := &dynamic.Configuration{ HTTP: th.BuildConfiguration( @@ -453,8 +458,7 @@ func TestListenProvidersIgnoreSameConfig(t *testing.T) { configurationReloads++ lastConfig = conf - // Allows next configurations to be sent by the mock provider - // as soon as the first configuration message is applied. + // Allows next configurations to be sent by the mock provider as soon as the first configuration message is applied. once.Do(func() { pvd.first <- struct{}{} // Wait for all configuration messages to pile in @@ -463,7 +467,9 @@ func TestListenProvidersIgnoreSameConfig(t *testing.T) { }) watcher.Start() - defer watcher.Stop() + + t.Cleanup(watcher.Stop) + t.Cleanup(routinesPool.Stop) // Wait long enough time.Sleep(50 * time.Millisecond) @@ -498,7 +504,6 @@ func TestListenProvidersIgnoreSameConfig(t *testing.T) { func TestApplyConfigUnderStress(t *testing.T) { routinesPool := safe.NewPool(context.Background()) - defer routinesPool.Stop() watcher := NewConfigurationWatcher(routinesPool, &mockProvider{}, []string{"defaultEP"}, "") @@ -525,15 +530,16 @@ func TestApplyConfigUnderStress(t *testing.T) { }) watcher.Start() - defer watcher.Stop() + + t.Cleanup(watcher.Stop) + t.Cleanup(routinesPool.Stop) time.Sleep(100 * time.Millisecond) // Ensure that at least two configurations have been applied - // if we simulate being spammed configuration changes by the - // provider(s). - // In theory, checking at least one would be sufficient, but - // checking for two also ensures that we're looping properly, + // if we simulate being spammed configuration changes by the provider(s). + // In theory, checking at least one would be sufficient, + // but checking for two also ensures that we're looping properly, // and that the whole algo holds, etc. t.Log(configurationReloads) assert.GreaterOrEqual(t, configurationReloads, 2) @@ -541,7 +547,6 @@ func TestApplyConfigUnderStress(t *testing.T) { func TestListenProvidersIgnoreIntermediateConfigs(t *testing.T) { routinesPool := safe.NewPool(context.Background()) - defer routinesPool.Stop() configuration := &dynamic.Configuration{ HTTP: th.BuildConfiguration( @@ -596,7 +601,9 @@ func TestListenProvidersIgnoreIntermediateConfigs(t *testing.T) { }) watcher.Start() - defer watcher.Stop() + + t.Cleanup(watcher.Stop) + t.Cleanup(routinesPool.Stop) // Wait long enough time.Sleep(500 * time.Millisecond) @@ -631,7 +638,6 @@ func TestListenProvidersIgnoreIntermediateConfigs(t *testing.T) { func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) { routinesPool := safe.NewPool(context.Background()) - defer routinesPool.Stop() configuration := &dynamic.Configuration{ HTTP: th.BuildConfiguration( @@ -656,7 +662,9 @@ func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) { }) watcher.Start() - defer watcher.Stop() + + t.Cleanup(watcher.Stop) + t.Cleanup(routinesPool.Stop) // give some time so that the configuration can be processed time.Sleep(100 * time.Millisecond) @@ -695,7 +703,6 @@ func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) { func TestPublishConfigUpdatedByProvider(t *testing.T) { routinesPool := safe.NewPool(context.Background()) - defer routinesPool.Stop() pvdConfiguration := dynamic.Configuration{ TCP: &dynamic.TCPConfiguration{ @@ -725,12 +732,14 @@ func TestPublishConfigUpdatedByProvider(t *testing.T) { watcher.AddListener(func(configuration dynamic.Configuration) { publishedConfigCount++ - // Update the provider configuration published in next dynamic Message which should trigger a new publish. + // Update the provider configuration published in next dynamic Message which should trigger a new publishing. pvdConfiguration.TCP.Routers["bar"] = &dynamic.TCPRouter{} }) watcher.Start() - defer watcher.Stop() + + t.Cleanup(watcher.Stop) + t.Cleanup(routinesPool.Stop) // give some time so that the configuration can be processed. time.Sleep(100 * time.Millisecond) @@ -740,7 +749,6 @@ func TestPublishConfigUpdatedByProvider(t *testing.T) { func TestPublishConfigUpdatedByConfigWatcherListener(t *testing.T) { routinesPool := safe.NewPool(context.Background()) - defer routinesPool.Stop() pvd := &mockProvider{ wait: 10 * time.Millisecond, @@ -774,13 +782,15 @@ func TestPublishConfigUpdatedByConfigWatcherListener(t *testing.T) { watcher.AddListener(func(configuration dynamic.Configuration) { publishedConfigCount++ - // Modify the provided configuration. This should not modify the configuration stored in the configuration - // watcher and cause a new publish. + // Modify the provided configuration. + // This should not modify the configuration stored in the configuration watcher and therefore there will be no new publishing. configuration.TCP.Routers["foo@mock"].Rule = "bar" }) watcher.Start() - defer watcher.Stop() + + t.Cleanup(watcher.Stop) + t.Cleanup(routinesPool.Stop) // give some time so that the configuration can be processed. time.Sleep(100 * time.Millisecond) diff --git a/pkg/server/server_entrypoint_tcp_test.go b/pkg/server/server_entrypoint_tcp_test.go index 73cc9c28b..342d7d9fa 100644 --- a/pkg/server/server_entrypoint_tcp_test.go +++ b/pkg/server/server_entrypoint_tcp_test.go @@ -48,18 +48,13 @@ func TestShutdownTCP(t *testing.T) { require.NoError(t, err) err = router.AddRoute("HostSNI(`*`)", 0, tcp.HandlerFunc(func(conn tcp.WriteCloser) { - for { - _, err := http.ReadRequest(bufio.NewReader(conn)) - - if errors.Is(err, io.EOF) || (err != nil && errors.Is(err, net.ErrClosed)) { - return - } - require.NoError(t, err) - - resp := http.Response{StatusCode: http.StatusOK} - err = resp.Write(conn) - require.NoError(t, err) + _, err := http.ReadRequest(bufio.NewReader(conn)) + if err != nil { + return } + + resp := http.Response{StatusCode: http.StatusOK} + _ = resp.Write(conn) })) require.NoError(t, err) @@ -89,6 +84,7 @@ func testShutdown(t *testing.T, router *tcprouter.Router) { conn, err := startEntrypoint(entryPoint, router) require.NoError(t, err) + t.Cleanup(func() { _ = conn.Close() }) epAddr := entryPoint.listener.Addr().String() @@ -97,14 +93,14 @@ func testShutdown(t *testing.T, router *tcprouter.Router) { time.Sleep(100 * time.Millisecond) - // We need to do a write on the conn before the shutdown to make it "exist". + // We need to do a write on conn before the shutdown to make it "exist". // Because the connection indeed exists as far as TCP is concerned, // but since we only pass it along to the HTTP server after at least one byte is peeked, // the HTTP server (and hence its shutdown) does not know about the connection until that first byte peeked. err = request.Write(conn) require.NoError(t, err) - reader := bufio.NewReader(conn) + reader := bufio.NewReaderSize(conn, 1) // Wait for first byte in response. _, err = reader.Peek(1) require.NoError(t, err) diff --git a/pkg/server/server_entrypoint_udp_test.go b/pkg/server/server_entrypoint_udp_test.go index f219cd98f..0396f434f 100644 --- a/pkg/server/server_entrypoint_udp_test.go +++ b/pkg/server/server_entrypoint_udp_test.go @@ -32,16 +32,19 @@ func TestShutdownUDPConn(t *testing.T) { for { b := make([]byte, 1024*1024) n, err := conn.Read(b) - require.NoError(t, err) - // We control the termination, otherwise we would block on the Read above, until - // conn is closed by a timeout. Which means we would get an error, and even though - // we are in a goroutine and the current test might be over, go test would still - // yell at us if this happens while other tests are still running. + if err != nil { + return + } + + // We control the termination, otherwise we would block on the Read above, + // until conn is closed by a timeout. + // Which means we would get an error, + // and even though we are in a goroutine and the current test might be over, + // go test would still yell at us if this happens while other tests are still running. if string(b[:n]) == "CLOSE" { return } - _, err = conn.Write(b[:n]) - require.NoError(t, err) + _, _ = conn.Write(b[:n]) } })) @@ -68,9 +71,9 @@ func TestShutdownUDPConn(t *testing.T) { // Packet is accepted, but dropped require.NoError(t, err) - // Make sure that our session is yet again still live. This is specifically to - // make sure we don't create a regression in listener's readLoop, i.e. that we only - // terminate the listener's readLoop goroutine by closing its pConn. + // Make sure that our session is yet again still live. + // This is specifically to make sure we don't create a regression in listener's readLoop, + // i.e. that we only terminate the listener's readLoop goroutine by closing its pConn. requireEcho(t, "TEST3", conn, time.Second) done := make(chan bool) @@ -101,10 +104,11 @@ func TestShutdownUDPConn(t *testing.T) { } } -// requireEcho tests that the conn session is live and functional, by writing -// data through it, and expecting the same data as a response when reading on it. -// It fatals if the read blocks longer than timeout, which is useful to detect -// regressions that would make a test wait forever. +// requireEcho tests that conn session is live and functional, +// by writing data through it, +// and expecting the same data as a response when reading on it. +// It fatals if the read blocks longer than timeout, +// which is useful to detect regressions that would make a test wait forever. func requireEcho(t *testing.T, data string, conn io.ReadWriter, timeout time.Duration) { t.Helper()