udp: replace concurrently reset timer with ticker
Co-authored-by: Julien Salleyron <julien.salleyron@gmail.com>
This commit is contained in:
parent
9012f2d6b1
commit
63d7ed74f1
5 changed files with 44 additions and 23 deletions
|
@ -101,7 +101,7 @@ func (s *UDPSuite) TestWRR(c *check.C) {
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-stop:
|
case <-stop:
|
||||||
case <-time.Tick(time.Second * 5):
|
case <-time.Tick(5 * time.Second):
|
||||||
c.Error("Timeout")
|
c.Error("Timeout")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,7 +84,7 @@ func testShutdown(t *testing.T, router *tcp.Router) {
|
||||||
request, err := http.NewRequest(http.MethodHead, "http://127.0.0.1:8082", nil)
|
request, err := http.NewRequest(http.MethodHead, "http://127.0.0.1:8082", nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 100)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
// We need to do a write on the conn before the shutdown to make it "exist".
|
// We need to do a write on the conn before the shutdown to make it "exist".
|
||||||
// Because the connection indeed exists as far as TCP is concerned,
|
// Because the connection indeed exists as far as TCP is concerned,
|
||||||
|
@ -104,7 +104,7 @@ func testShutdown(t *testing.T, router *tcp.Router) {
|
||||||
loopConn, err := net.Dial("tcp", epAddr)
|
loopConn, err := net.Dial("tcp", epAddr)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
loopConn.Close()
|
loopConn.Close()
|
||||||
time.Sleep(time.Millisecond * 100)
|
time.Sleep(100 * time.Millisecond)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !strings.HasSuffix(err.Error(), "connection refused") && !strings.HasSuffix(err.Error(), "reset by peer") {
|
if !strings.HasSuffix(err.Error(), "connection refused") && !strings.HasSuffix(err.Error(), "reset by peer") {
|
||||||
|
@ -135,7 +135,7 @@ func startEntrypoint(entryPoint *TCPEntryPoint, router *tcp.Router) (net.Conn, e
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
conn, err = net.Dial("tcp", entryPoint.listener.Addr().String())
|
conn, err = net.Dial("tcp", entryPoint.listener.Addr().String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
time.Sleep(time.Millisecond * 100)
|
time.Sleep(100 * time.Millisecond)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
epStarted = true
|
epStarted = true
|
||||||
|
@ -150,7 +150,7 @@ func startEntrypoint(entryPoint *TCPEntryPoint, router *tcp.Router) (net.Conn, e
|
||||||
func TestReadTimeoutWithoutFirstByte(t *testing.T) {
|
func TestReadTimeoutWithoutFirstByte(t *testing.T) {
|
||||||
epConfig := &static.EntryPointsTransport{}
|
epConfig := &static.EntryPointsTransport{}
|
||||||
epConfig.SetDefaults()
|
epConfig.SetDefaults()
|
||||||
epConfig.RespondingTimeouts.ReadTimeout = types.Duration(time.Second * 2)
|
epConfig.RespondingTimeouts.ReadTimeout = types.Duration(2 * time.Second)
|
||||||
|
|
||||||
entryPoint, err := NewTCPEntryPoint(context.Background(), &static.EntryPoint{
|
entryPoint, err := NewTCPEntryPoint(context.Background(), &static.EntryPoint{
|
||||||
Address: ":0",
|
Address: ":0",
|
||||||
|
@ -178,7 +178,7 @@ func TestReadTimeoutWithoutFirstByte(t *testing.T) {
|
||||||
select {
|
select {
|
||||||
case err := <-errChan:
|
case err := <-errChan:
|
||||||
require.Equal(t, io.EOF, err)
|
require.Equal(t, io.EOF, err)
|
||||||
case <-time.Tick(time.Second * 5):
|
case <-time.Tick(5 * time.Second):
|
||||||
t.Error("Timeout while read")
|
t.Error("Timeout while read")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -186,7 +186,7 @@ func TestReadTimeoutWithoutFirstByte(t *testing.T) {
|
||||||
func TestReadTimeoutWithFirstByte(t *testing.T) {
|
func TestReadTimeoutWithFirstByte(t *testing.T) {
|
||||||
epConfig := &static.EntryPointsTransport{}
|
epConfig := &static.EntryPointsTransport{}
|
||||||
epConfig.SetDefaults()
|
epConfig.SetDefaults()
|
||||||
epConfig.RespondingTimeouts.ReadTimeout = types.Duration(time.Second * 2)
|
epConfig.RespondingTimeouts.ReadTimeout = types.Duration(2 * time.Second)
|
||||||
|
|
||||||
entryPoint, err := NewTCPEntryPoint(context.Background(), &static.EntryPoint{
|
entryPoint, err := NewTCPEntryPoint(context.Background(), &static.EntryPoint{
|
||||||
Address: ":0",
|
Address: ":0",
|
||||||
|
@ -217,7 +217,7 @@ func TestReadTimeoutWithFirstByte(t *testing.T) {
|
||||||
select {
|
select {
|
||||||
case err := <-errChan:
|
case err := <-errChan:
|
||||||
require.Equal(t, io.EOF, err)
|
require.Equal(t, io.EOF, err)
|
||||||
case <-time.Tick(time.Second * 5):
|
case <-time.Tick(5 * time.Second):
|
||||||
t.Error("Timeout while read")
|
t.Error("Timeout while read")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,7 +92,7 @@ func TestShutdownUDPConn(t *testing.T) {
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-doneChan:
|
case <-doneChan:
|
||||||
case <-time.Tick(time.Second * 5):
|
case <-time.Tick(10 * time.Second):
|
||||||
// In case we introduce a regression that would make the test wait forever.
|
// In case we introduce a regression that would make the test wait forever.
|
||||||
t.Fatal("Timeout during shutdown")
|
t.Fatal("Timeout during shutdown")
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,9 @@ const closeRetryInterval = 500 * time.Millisecond
|
||||||
|
|
||||||
// connTimeout determines how long to wait on an idle session,
|
// connTimeout determines how long to wait on an idle session,
|
||||||
// before releasing all resources related to that session.
|
// before releasing all resources related to that session.
|
||||||
const connTimeout = time.Second * 3
|
const connTimeout = 3 * time.Second
|
||||||
|
|
||||||
|
var timeoutTicker = connTimeout / 10
|
||||||
|
|
||||||
var errClosedListener = errors.New("udp: listener closed")
|
var errClosedListener = errors.New("udp: listener closed")
|
||||||
|
|
||||||
|
@ -175,7 +177,7 @@ func (l *Listener) newConn(rAddr net.Addr) *Conn {
|
||||||
readCh: make(chan []byte),
|
readCh: make(chan []byte),
|
||||||
sizeCh: make(chan int),
|
sizeCh: make(chan int),
|
||||||
doneCh: make(chan struct{}),
|
doneCh: make(chan struct{}),
|
||||||
timer: time.NewTimer(connTimeout),
|
ticker: time.NewTicker(timeoutTicker),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,7 +191,10 @@ type Conn struct {
|
||||||
sizeCh chan int // to synchronize with the end of a Read
|
sizeCh chan int // to synchronize with the end of a Read
|
||||||
msgs [][]byte // to store data from listener, to be consumed by Reads
|
msgs [][]byte // to store data from listener, to be consumed by Reads
|
||||||
|
|
||||||
timer *time.Timer // for timeouts
|
muActivity sync.RWMutex
|
||||||
|
lastActivity time.Time // the last time the session saw either read or write activity
|
||||||
|
|
||||||
|
ticker *time.Ticker // for timeouts
|
||||||
doneOnce sync.Once
|
doneOnce sync.Once
|
||||||
doneCh chan struct{}
|
doneCh chan struct{}
|
||||||
}
|
}
|
||||||
|
@ -204,9 +209,15 @@ func (c *Conn) readLoop() {
|
||||||
select {
|
select {
|
||||||
case msg := <-c.receiveCh:
|
case msg := <-c.receiveCh:
|
||||||
c.msgs = append(c.msgs, msg)
|
c.msgs = append(c.msgs, msg)
|
||||||
case <-c.timer.C:
|
case <-c.ticker.C:
|
||||||
c.Close()
|
c.muActivity.RLock()
|
||||||
return
|
deadline := c.lastActivity.Add(connTimeout)
|
||||||
|
c.muActivity.RUnlock()
|
||||||
|
if time.Now().After(deadline) {
|
||||||
|
c.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -218,9 +229,14 @@ func (c *Conn) readLoop() {
|
||||||
c.sizeCh <- n
|
c.sizeCh <- n
|
||||||
case msg := <-c.receiveCh:
|
case msg := <-c.receiveCh:
|
||||||
c.msgs = append(c.msgs, msg)
|
c.msgs = append(c.msgs, msg)
|
||||||
case <-c.timer.C:
|
case <-c.ticker.C:
|
||||||
c.Close()
|
c.muActivity.RLock()
|
||||||
return
|
deadline := c.lastActivity.Add(connTimeout)
|
||||||
|
c.muActivity.RUnlock()
|
||||||
|
if time.Now().After(deadline) {
|
||||||
|
c.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -230,7 +246,9 @@ func (c *Conn) Read(p []byte) (int, error) {
|
||||||
select {
|
select {
|
||||||
case c.readCh <- p:
|
case c.readCh <- p:
|
||||||
n := <-c.sizeCh
|
n := <-c.sizeCh
|
||||||
c.timer.Reset(connTimeout)
|
c.muActivity.Lock()
|
||||||
|
c.lastActivity = time.Now()
|
||||||
|
c.muActivity.Unlock()
|
||||||
return n, nil
|
return n, nil
|
||||||
case <-c.doneCh:
|
case <-c.doneCh:
|
||||||
return 0, io.EOF
|
return 0, io.EOF
|
||||||
|
@ -244,7 +262,9 @@ func (c *Conn) Write(p []byte) (n int, err error) {
|
||||||
return 0, io.EOF
|
return 0, io.EOF
|
||||||
}
|
}
|
||||||
|
|
||||||
c.timer.Reset(connTimeout)
|
c.muActivity.Lock()
|
||||||
|
c.lastActivity = time.Now()
|
||||||
|
c.muActivity.Unlock()
|
||||||
return l.pConn.WriteTo(p, c.rAddr)
|
return l.pConn.WriteTo(p, c.rAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -261,5 +281,6 @@ func (c *Conn) Close() error {
|
||||||
c.listener.mu.Lock()
|
c.listener.mu.Lock()
|
||||||
defer c.listener.mu.Unlock()
|
defer c.listener.mu.Unlock()
|
||||||
delete(c.listener.conns, c.rAddr.String())
|
delete(c.listener.conns, c.rAddr.String())
|
||||||
|
c.ticker.Stop()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,7 @@ func TestListenNotBlocking(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// This should not block second call
|
// This should not block second call
|
||||||
time.Sleep(time.Second * 10)
|
time.Sleep(10 * time.Second)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -148,7 +148,7 @@ func testTimeout(t *testing.T, withRead bool) {
|
||||||
|
|
||||||
assert.Equal(t, 10, len(ln.conns))
|
assert.Equal(t, 10, len(ln.conns))
|
||||||
|
|
||||||
time.Sleep(3 * time.Second)
|
time.Sleep(4 * time.Second)
|
||||||
assert.Equal(t, 0, len(ln.conns))
|
assert.Equal(t, 0, len(ln.conns))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,7 +239,7 @@ func TestShutdown(t *testing.T) {
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-doneChan:
|
case <-doneChan:
|
||||||
case <-time.Tick(time.Second * 5):
|
case <-time.Tick(5 * time.Second):
|
||||||
// In case we introduce a regression that would make the test wait forever.
|
// In case we introduce a regression that would make the test wait forever.
|
||||||
t.Fatal("Timeout during shutdown")
|
t.Fatal("Timeout during shutdown")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue