From 63d7ed74f11853b5fa249b3f7f5e366ced3826d3 Mon Sep 17 00:00:00 2001 From: mpl Date: Wed, 18 Mar 2020 14:50:06 +0100 Subject: [PATCH] udp: replace concurrently reset timer with ticker Co-authored-by: Julien Salleyron --- integration/udp_test.go | 2 +- pkg/server/server_entrypoint_tcp_test.go | 14 ++++---- pkg/server/server_entrypoint_udp_test.go | 2 +- pkg/udp/conn.go | 43 ++++++++++++++++++------ pkg/udp/conn_test.go | 6 ++-- 5 files changed, 44 insertions(+), 23 deletions(-) diff --git a/integration/udp_test.go b/integration/udp_test.go index ec9214e55..0e4265cd3 100644 --- a/integration/udp_test.go +++ b/integration/udp_test.go @@ -101,7 +101,7 @@ func (s *UDPSuite) TestWRR(c *check.C) { select { case <-stop: - case <-time.Tick(time.Second * 5): + case <-time.Tick(5 * time.Second): c.Error("Timeout") } } diff --git a/pkg/server/server_entrypoint_tcp_test.go b/pkg/server/server_entrypoint_tcp_test.go index 8da063281..b35dce36f 100644 --- a/pkg/server/server_entrypoint_tcp_test.go +++ b/pkg/server/server_entrypoint_tcp_test.go @@ -84,7 +84,7 @@ func testShutdown(t *testing.T, router *tcp.Router) { request, err := http.NewRequest(http.MethodHead, "http://127.0.0.1:8082", nil) require.NoError(t, err) - time.Sleep(time.Millisecond * 100) + time.Sleep(100 * time.Millisecond) // We need to do a write on the conn before the shutdown to make it "exist". // Because the connection indeed exists as far as TCP is concerned, @@ -104,7 +104,7 @@ func testShutdown(t *testing.T, router *tcp.Router) { loopConn, err := net.Dial("tcp", epAddr) if err == nil { loopConn.Close() - time.Sleep(time.Millisecond * 100) + time.Sleep(100 * time.Millisecond) continue } if !strings.HasSuffix(err.Error(), "connection refused") && !strings.HasSuffix(err.Error(), "reset by peer") { @@ -135,7 +135,7 @@ func startEntrypoint(entryPoint *TCPEntryPoint, router *tcp.Router) (net.Conn, e for i := 0; i < 10; i++ { conn, err = net.Dial("tcp", entryPoint.listener.Addr().String()) if err != nil { - time.Sleep(time.Millisecond * 100) + time.Sleep(100 * time.Millisecond) continue } epStarted = true @@ -150,7 +150,7 @@ func startEntrypoint(entryPoint *TCPEntryPoint, router *tcp.Router) (net.Conn, e func TestReadTimeoutWithoutFirstByte(t *testing.T) { epConfig := &static.EntryPointsTransport{} epConfig.SetDefaults() - epConfig.RespondingTimeouts.ReadTimeout = types.Duration(time.Second * 2) + epConfig.RespondingTimeouts.ReadTimeout = types.Duration(2 * time.Second) entryPoint, err := NewTCPEntryPoint(context.Background(), &static.EntryPoint{ Address: ":0", @@ -178,7 +178,7 @@ func TestReadTimeoutWithoutFirstByte(t *testing.T) { select { case err := <-errChan: require.Equal(t, io.EOF, err) - case <-time.Tick(time.Second * 5): + case <-time.Tick(5 * time.Second): t.Error("Timeout while read") } } @@ -186,7 +186,7 @@ func TestReadTimeoutWithoutFirstByte(t *testing.T) { func TestReadTimeoutWithFirstByte(t *testing.T) { epConfig := &static.EntryPointsTransport{} epConfig.SetDefaults() - epConfig.RespondingTimeouts.ReadTimeout = types.Duration(time.Second * 2) + epConfig.RespondingTimeouts.ReadTimeout = types.Duration(2 * time.Second) entryPoint, err := NewTCPEntryPoint(context.Background(), &static.EntryPoint{ Address: ":0", @@ -217,7 +217,7 @@ func TestReadTimeoutWithFirstByte(t *testing.T) { select { case err := <-errChan: require.Equal(t, io.EOF, err) - case <-time.Tick(time.Second * 5): + case <-time.Tick(5 * time.Second): t.Error("Timeout while read") } } diff --git a/pkg/server/server_entrypoint_udp_test.go b/pkg/server/server_entrypoint_udp_test.go index 06861bc8d..6f468e901 100644 --- a/pkg/server/server_entrypoint_udp_test.go +++ b/pkg/server/server_entrypoint_udp_test.go @@ -92,7 +92,7 @@ func TestShutdownUDPConn(t *testing.T) { select { case <-doneChan: - case <-time.Tick(time.Second * 5): + case <-time.Tick(10 * time.Second): // In case we introduce a regression that would make the test wait forever. t.Fatal("Timeout during shutdown") } diff --git a/pkg/udp/conn.go b/pkg/udp/conn.go index 4b2f0adcc..0fec8ce04 100644 --- a/pkg/udp/conn.go +++ b/pkg/udp/conn.go @@ -14,7 +14,9 @@ const closeRetryInterval = 500 * time.Millisecond // connTimeout determines how long to wait on an idle session, // before releasing all resources related to that session. -const connTimeout = time.Second * 3 +const connTimeout = 3 * time.Second + +var timeoutTicker = connTimeout / 10 var errClosedListener = errors.New("udp: listener closed") @@ -175,7 +177,7 @@ func (l *Listener) newConn(rAddr net.Addr) *Conn { readCh: make(chan []byte), sizeCh: make(chan int), doneCh: make(chan struct{}), - timer: time.NewTimer(connTimeout), + ticker: time.NewTicker(timeoutTicker), } } @@ -189,7 +191,10 @@ type Conn struct { sizeCh chan int // to synchronize with the end of a Read msgs [][]byte // to store data from listener, to be consumed by Reads - timer *time.Timer // for timeouts + muActivity sync.RWMutex + lastActivity time.Time // the last time the session saw either read or write activity + + ticker *time.Ticker // for timeouts doneOnce sync.Once doneCh chan struct{} } @@ -204,9 +209,15 @@ func (c *Conn) readLoop() { select { case msg := <-c.receiveCh: c.msgs = append(c.msgs, msg) - case <-c.timer.C: - c.Close() - return + case <-c.ticker.C: + c.muActivity.RLock() + deadline := c.lastActivity.Add(connTimeout) + c.muActivity.RUnlock() + if time.Now().After(deadline) { + c.Close() + return + } + continue } } @@ -218,9 +229,14 @@ func (c *Conn) readLoop() { c.sizeCh <- n case msg := <-c.receiveCh: c.msgs = append(c.msgs, msg) - case <-c.timer.C: - c.Close() - return + case <-c.ticker.C: + c.muActivity.RLock() + deadline := c.lastActivity.Add(connTimeout) + c.muActivity.RUnlock() + if time.Now().After(deadline) { + c.Close() + return + } } } } @@ -230,7 +246,9 @@ func (c *Conn) Read(p []byte) (int, error) { select { case c.readCh <- p: n := <-c.sizeCh - c.timer.Reset(connTimeout) + c.muActivity.Lock() + c.lastActivity = time.Now() + c.muActivity.Unlock() return n, nil case <-c.doneCh: return 0, io.EOF @@ -244,7 +262,9 @@ func (c *Conn) Write(p []byte) (n int, err error) { return 0, io.EOF } - c.timer.Reset(connTimeout) + c.muActivity.Lock() + c.lastActivity = time.Now() + c.muActivity.Unlock() return l.pConn.WriteTo(p, c.rAddr) } @@ -261,5 +281,6 @@ func (c *Conn) Close() error { c.listener.mu.Lock() defer c.listener.mu.Unlock() delete(c.listener.conns, c.rAddr.String()) + c.ticker.Stop() return nil } diff --git a/pkg/udp/conn_test.go b/pkg/udp/conn_test.go index 50abfecb7..91d3fe571 100644 --- a/pkg/udp/conn_test.go +++ b/pkg/udp/conn_test.go @@ -43,7 +43,7 @@ func TestListenNotBlocking(t *testing.T) { require.NoError(t, err) // This should not block second call - time.Sleep(time.Second * 10) + time.Sleep(10 * time.Second) }() } }() @@ -148,7 +148,7 @@ func testTimeout(t *testing.T, withRead bool) { assert.Equal(t, 10, len(ln.conns)) - time.Sleep(3 * time.Second) + time.Sleep(4 * time.Second) assert.Equal(t, 0, len(ln.conns)) } @@ -239,7 +239,7 @@ func TestShutdown(t *testing.T) { select { case <-doneChan: - case <-time.Tick(time.Second * 5): + case <-time.Tick(5 * time.Second): // In case we introduce a regression that would make the test wait forever. t.Fatal("Timeout during shutdown") }