fix: increase UDP read buffer length to max datagram size

Co-authored-by: Tom Moulard <tom.moulard@traefik.io>
This commit is contained in:
Kevin Pollet 2021-11-09 15:12:07 +01:00 committed by GitHub
parent 9df053e3f5
commit db4a92d877
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 137 additions and 18 deletions

View file

@ -8,7 +8,8 @@ import (
"time" "time"
) )
const receiveMTU = 8192 // maxDatagramSize is the maximum size of a UDP datagram.
const maxDatagramSize = 65535
const closeRetryInterval = 500 * time.Millisecond const closeRetryInterval = 500 * time.Millisecond
@ -135,7 +136,8 @@ func (l *Listener) readLoop() {
// Allocating a new buffer for every read avoids // Allocating a new buffer for every read avoids
// overwriting data in c.msgs in case the next packet is received // overwriting data in c.msgs in case the next packet is received
// before c.msgs is emptied via Read() // before c.msgs is emptied via Read()
buf := make([]byte, receiveMTU) buf := make([]byte, maxDatagramSize)
n, raddr, err := l.pConn.ReadFrom(buf) n, raddr, err := l.pConn.ReadFrom(buf)
if err != nil { if err != nil {
return return
@ -144,6 +146,7 @@ func (l *Listener) readLoop() {
if err != nil { if err != nil {
continue continue
} }
select { select {
case conn.receiveCh <- buf[:n]: case conn.receiveCh <- buf[:n]:
case <-conn.doneCh: case <-conn.doneCh:
@ -249,7 +252,9 @@ func (c *Conn) readLoop() {
} }
} }
// Read implements io.Reader for a Conn. // Read reads up to len(p) bytes into p from the connection.
// Each call corresponds to at most one datagram.
// If p is smaller than the datagram, the extra bytes will be discarded.
func (c *Conn) Read(p []byte) (int, error) { func (c *Conn) Read(p []byte) (int, error) {
select { select {
case c.readCh <- p: case c.readCh <- p:
@ -258,22 +263,21 @@ func (c *Conn) Read(p []byte) (int, error) {
c.lastActivity = time.Now() c.lastActivity = time.Now()
c.muActivity.Unlock() c.muActivity.Unlock()
return n, nil return n, nil
case <-c.doneCh: case <-c.doneCh:
return 0, io.EOF return 0, io.EOF
} }
} }
// Write implements io.Writer for a Conn. // Write writes len(p) bytes from p to the underlying connection.
// Each call sends at most one datagram.
// It is an error to send a message larger than the system's max UDP datagram size.
func (c *Conn) Write(p []byte) (n int, err error) { func (c *Conn) Write(p []byte) (n int, err error) {
l := c.listener
if l == nil {
return 0, io.EOF
}
c.muActivity.Lock() c.muActivity.Lock()
c.lastActivity = time.Now() c.lastActivity = time.Now()
c.muActivity.Unlock() c.muActivity.Unlock()
return l.pConn.WriteTo(p, c.rAddr)
return c.listener.pConn.WriteTo(p, c.rAddr)
} }
func (c *Conn) close() { func (c *Conn) close() {

View file

@ -1,9 +1,11 @@
package udp package udp
import ( import (
"crypto/rand"
"errors" "errors"
"io" "io"
"net" "net"
"runtime"
"testing" "testing"
"time" "time"
@ -317,6 +319,61 @@ func TestShutdown(t *testing.T) {
} }
} }
func TestReadLoopMaxDataSize(t *testing.T) {
if runtime.GOOS == "darwin" {
// sudo sysctl -w net.inet.udp.maxdgram=65507
t.Skip("Skip test on darwin as the maximum dgram size is set to 9216 bytes by default")
}
// Theoretical maximum size of data in a UDP datagram.
// 65535 8 (UDP header) 20 (IP header).
dataSize := 65507
doneCh := make(chan struct{})
addr, err := net.ResolveUDPAddr("udp", ":0")
require.NoError(t, err)
l, err := Listen("udp", addr, 3*time.Second)
require.NoError(t, err)
defer func() {
err := l.Close()
require.NoError(t, err)
}()
go func() {
defer close(doneCh)
conn, err := l.Accept()
require.NoError(t, err)
buffer := make([]byte, dataSize)
n, err := conn.Read(buffer)
require.NoError(t, err)
assert.Equal(t, dataSize, n)
}()
c, err := net.Dial("udp", l.Addr().String())
require.NoError(t, err)
data := make([]byte, dataSize)
_, err = rand.Read(data)
require.NoError(t, err)
_, err = c.Write(data)
require.NoError(t, err)
select {
case <-doneCh:
case <-time.Tick(5 * time.Second):
t.Fatal("Timeout waiting for datagram read")
}
}
// requireEcho tests that the conn session is live and functional, // requireEcho tests that the conn session is live and functional,
// by writing data through it, and expecting the same data as a response when reading on it. // by writing data through it, and expecting the same data as a response when reading on it.
// It fatals if the read blocks longer than timeout, // It fatals if the read blocks longer than timeout,

View file

@ -20,14 +20,14 @@ func NewProxy(address string) (*Proxy, error) {
// ServeUDP implements the Handler interface. // ServeUDP implements the Handler interface.
func (p *Proxy) ServeUDP(conn *Conn) { func (p *Proxy) ServeUDP(conn *Conn) {
log.Debugf("Handling connection from %s", conn.rAddr) log.WithoutContext().Debugf("Handling connection from %s", conn.rAddr)
// needed because of e.g. server.trackedConnection // needed because of e.g. server.trackedConnection
defer conn.Close() defer conn.Close()
connBackend, err := net.Dial("udp", p.target) connBackend, err := net.Dial("udp", p.target)
if err != nil { if err != nil {
log.Errorf("Error while connecting to backend: %v", err) log.WithoutContext().Errorf("Error while connecting to backend: %v", err)
return return
} }
@ -35,8 +35,8 @@ func (p *Proxy) ServeUDP(conn *Conn) {
defer connBackend.Close() defer connBackend.Close()
errChan := make(chan error) errChan := make(chan error)
go p.connCopy(conn, connBackend, errChan) go connCopy(conn, connBackend, errChan)
go p.connCopy(connBackend, conn, errChan) go connCopy(connBackend, conn, errChan)
err = <-errChan err = <-errChan
if err != nil { if err != nil {
@ -46,8 +46,12 @@ func (p *Proxy) ServeUDP(conn *Conn) {
<-errChan <-errChan
} }
func (p Proxy) connCopy(dst io.WriteCloser, src io.Reader, errCh chan error) { func connCopy(dst io.WriteCloser, src io.Reader, errCh chan error) {
_, err := io.Copy(dst, src) // The buffer is initialized to the maximum UDP datagram size,
// to make sure that the whole UDP datagram is read or written atomically (no data is discarded).
buffer := make([]byte, maxDatagramSize)
_, err := io.CopyBuffer(dst, src, buffer)
errCh <- err errCh <- err
if err := dst.Close(); err != nil { if err := dst.Close(); err != nil {

View file

@ -1,7 +1,9 @@
package udp package udp
import ( import (
"crypto/rand"
"net" "net"
"runtime"
"testing" "testing"
"time" "time"
@ -9,13 +11,14 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestUDPProxy(t *testing.T) { func TestProxy_ServeUDP(t *testing.T) {
backendAddr := ":8081" backendAddr := ":8081"
go newServer(t, ":8081", HandlerFunc(func(conn *Conn) { go newServer(t, backendAddr, HandlerFunc(func(conn *Conn) {
for { for {
b := make([]byte, 1024*1024) b := make([]byte, 1024*1024)
n, err := conn.Read(b) n, err := conn.Read(b)
require.NoError(t, err) require.NoError(t, err)
_, err = conn.Write(b[:n]) _, err = conn.Write(b[:n])
require.NoError(t, err) require.NoError(t, err)
} }
@ -28,6 +31,7 @@ func TestUDPProxy(t *testing.T) {
go newServer(t, proxyAddr, proxy) go newServer(t, proxyAddr, proxy)
time.Sleep(time.Second) time.Sleep(time.Second)
udpConn, err := net.Dial("udp", proxyAddr) udpConn, err := net.Dial("udp", proxyAddr)
require.NoError(t, err) require.NoError(t, err)
@ -37,9 +41,58 @@ func TestUDPProxy(t *testing.T) {
b := make([]byte, 1024*1024) b := make([]byte, 1024*1024)
n, err := udpConn.Read(b) n, err := udpConn.Read(b)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, "DATAWRITE", string(b[:n])) assert.Equal(t, "DATAWRITE", string(b[:n]))
} }
func TestProxy_ServeUDP_MaxDataSize(t *testing.T) {
if runtime.GOOS == "darwin" {
// sudo sysctl -w net.inet.udp.maxdgram=65507
t.Skip("Skip test on darwin as the maximum dgram size is set to 9216 bytes by default")
}
// Theoretical maximum size of data in a UDP datagram.
// 65535 8 (UDP header) 20 (IP header).
dataSize := 65507
backendAddr := ":8083"
go newServer(t, backendAddr, HandlerFunc(func(conn *Conn) {
buffer := make([]byte, dataSize)
n, err := conn.Read(buffer)
require.NoError(t, err)
_, err = conn.Write(buffer[:n])
require.NoError(t, err)
}))
proxy, err := NewProxy(backendAddr)
require.NoError(t, err)
proxyAddr := ":8082"
go newServer(t, proxyAddr, proxy)
time.Sleep(time.Second)
udpConn, err := net.Dial("udp", proxyAddr)
require.NoError(t, err)
want := make([]byte, dataSize)
_, err = rand.Read(want)
require.NoError(t, err)
_, err = udpConn.Write(want)
require.NoError(t, err)
got := make([]byte, dataSize)
_, err = udpConn.Read(got)
require.NoError(t, err)
assert.Equal(t, want, got)
}
func newServer(t *testing.T, addr string, handler Handler) { func newServer(t *testing.T, addr string, handler Handler) {
t.Helper() t.Helper()
@ -52,6 +105,7 @@ func newServer(t *testing.T, addr string, handler Handler) {
for { for {
conn, err := listener.Accept() conn, err := listener.Accept()
require.NoError(t, err) require.NoError(t, err)
go handler.ServeUDP(conn) go handler.ServeUDP(conn)
} }
} }