From d02be003ab9d37d19cf8e95c58124e7e31b371c6 Mon Sep 17 00:00:00 2001 From: Aofei Sheng Date: Tue, 30 Jan 2024 21:56:05 +0800 Subject: [PATCH] Add SO_REUSEPORT support for EntryPoints --- .../reference/static-configuration/cli-ref.md | 3 + .../reference/static-configuration/env-ref.md | 3 + .../reference/static-configuration/file.toml | 1 + .../reference/static-configuration/file.yaml | 1 + docs/content/routing/entrypoints.md | 73 +++++++++++++++++++ go.mod | 2 +- pkg/config/static/entrypoints.go | 1 + .../server_entrypoint_listenconfig_other.go | 15 ++++ ...rver_entrypoint_listenconfig_other_test.go | 44 +++++++++++ .../server_entrypoint_listenconfig_unix.go | 44 +++++++++++ ...point_listenconfig_unix_sockopt_freebsd.go | 7 ++ ...rypoint_listenconfig_unix_sockopt_other.go | 7 ++ ...erver_entrypoint_listenconfig_unix_test.go | 56 ++++++++++++++ pkg/server/server_entrypoint_tcp.go | 3 +- pkg/server/server_entrypoint_tcp_http3.go | 3 +- pkg/server/server_entrypoint_udp.go | 9 +-- pkg/udp/conn.go | 14 +++- pkg/udp/conn_test.go | 31 ++------ pkg/udp/proxy_test.go | 5 +- 19 files changed, 279 insertions(+), 43 deletions(-) create mode 100644 pkg/server/server_entrypoint_listenconfig_other.go create mode 100644 pkg/server/server_entrypoint_listenconfig_other_test.go create mode 100644 pkg/server/server_entrypoint_listenconfig_unix.go create mode 100644 pkg/server/server_entrypoint_listenconfig_unix_sockopt_freebsd.go create mode 100644 pkg/server/server_entrypoint_listenconfig_unix_sockopt_other.go create mode 100644 pkg/server/server_entrypoint_listenconfig_unix_test.go diff --git a/docs/content/reference/static-configuration/cli-ref.md b/docs/content/reference/static-configuration/cli-ref.md index 2f655ccb1..d2a5db058 100644 --- a/docs/content/reference/static-configuration/cli-ref.md +++ b/docs/content/reference/static-configuration/cli-ref.md @@ -180,6 +180,9 @@ Trust all. (Default: ```false```) `--entrypoints..proxyprotocol.trustedips`: Trust only selected IPs. +`--entrypoints..reuseport`: +Enables EntryPoints from the same or different processes listening on the same TCP/UDP port. (Default: ```false```) + `--entrypoints..transport.keepalivemaxrequests`: Maximum number of requests before closing a keep-alive connection. (Default: ```0```) diff --git a/docs/content/reference/static-configuration/env-ref.md b/docs/content/reference/static-configuration/env-ref.md index fdf733783..cc813b154 100644 --- a/docs/content/reference/static-configuration/env-ref.md +++ b/docs/content/reference/static-configuration/env-ref.md @@ -180,6 +180,9 @@ Trust all. (Default: ```false```) `TRAEFIK_ENTRYPOINTS__PROXYPROTOCOL_TRUSTEDIPS`: Trust only selected IPs. +`TRAEFIK_ENTRYPOINTS__REUSEPORT`: +Enables EntryPoints from the same or different processes listening on the same TCP/UDP port. (Default: ```false```) + `TRAEFIK_ENTRYPOINTS__TRANSPORT_KEEPALIVEMAXREQUESTS`: Maximum number of requests before closing a keep-alive connection. (Default: ```0```) diff --git a/docs/content/reference/static-configuration/file.toml b/docs/content/reference/static-configuration/file.toml index 8f9326416..79226cb70 100644 --- a/docs/content/reference/static-configuration/file.toml +++ b/docs/content/reference/static-configuration/file.toml @@ -30,6 +30,7 @@ [entryPoints] [entryPoints.EntryPoint0] address = "foobar" + reusePort = true asDefault = true [entryPoints.EntryPoint0.transport] keepAliveMaxTime = "42s" diff --git a/docs/content/reference/static-configuration/file.yaml b/docs/content/reference/static-configuration/file.yaml index 1739759cb..45092911f 100644 --- a/docs/content/reference/static-configuration/file.yaml +++ b/docs/content/reference/static-configuration/file.yaml @@ -35,6 +35,7 @@ tcpServersTransport: entryPoints: EntryPoint0: address: foobar + reusePort: true asDefault: true transport: lifeCycle: diff --git a/docs/content/routing/entrypoints.md b/docs/content/routing/entrypoints.md index cd79c3cfd..aa92aa5c9 100644 --- a/docs/content/routing/entrypoints.md +++ b/docs/content/routing/entrypoints.md @@ -233,6 +233,79 @@ If both TCP and UDP are wanted for the same port, two entryPoints definitions ar Full details for how to specify `address` can be found in [net.Listen](https://golang.org/pkg/net/#Listen) (and [net.Dial](https://golang.org/pkg/net/#Dial)) of the doc for go. +### ReusePort + +_Optional, Default=false_ + +The `ReusePort` option enables EntryPoints from the same or different processes +listening on the same TCP/UDP port by utilizing the `SO_REUSEPORT` socket option. +It also allows the kernel to act like a load balancer to distribute incoming +connections between entry points. + +For example, you can use it with the [transport.lifeCycle](#lifecycle) to do +canary deployments against Traefik itself. Like upgrading Traefik version or +reloading the static configuration without any service downtime. + +!!! warning "Supported platforms" + + The `ReusePort` option currently works only on Linux, FreeBSD, OpenBSD and Darwin. + It will be ignored on other platforms. + + There is a known bug in the Linux kernel that may cause unintended TCP connection failures when using the `ReusePort` option. + For more details, see https://lwn.net/Articles/853637/. + +??? example "Listen on the same port" + + ```yaml tab="File (yaml)" + entryPoints: + web: + address: ":80" + reusePort: true + ``` + + ```toml tab="File (TOML)" + [entryPoints.web] + address = ":80" + reusePort = true + ``` + + ```bash tab="CLI" + --entrypoints.web.address=:80 + --entrypoints.web.reusePort=true + ``` + + Now it is possible to run multiple Traefik processes with the same EntryPoint configuration. + +??? example "Listen on the same port but bind to a different host" + + ```yaml tab="File (yaml)" + entryPoints: + web: + address: ":80" + reusePort: true + privateWeb: + address: "192.168.1.2:80" + reusePort: true + ``` + + ```toml tab="File (TOML)" + [entryPoints.web] + address = ":80" + reusePort = true + [entryPoints.privateWeb] + address = "192.168.1.2:80" + reusePort = true + ``` + + ```bash tab="CLI" + --entrypoints.web.address=:80 + --entrypoints.web.reusePort=true + --entrypoints.privateWeb.address=192.168.1.2:80 + --entrypoints.privateWeb.reusePort=true + ``` + + Requests to `192.168.1.2:80` will only be handled by routers that have `privateWeb` as the entry point. + ### AsDefault _Optional, Default=false_ diff --git a/go.mod b/go.mod index 3685c0c5a..b4e712b51 100644 --- a/go.mod +++ b/go.mod @@ -80,6 +80,7 @@ require ( golang.org/x/exp v0.0.0-20231006140011-7918f672742d golang.org/x/mod v0.13.0 golang.org/x/net v0.17.0 + golang.org/x/sys v0.15.0 golang.org/x/text v0.13.0 golang.org/x/time v0.3.0 golang.org/x/tools v0.14.0 @@ -315,7 +316,6 @@ require ( golang.org/x/arch v0.4.0 // indirect golang.org/x/crypto v0.14.0 // indirect golang.org/x/oauth2 v0.13.0 // indirect - golang.org/x/sys v0.15.0 // indirect golang.org/x/term v0.13.0 // indirect google.golang.org/api v0.128.0 // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/pkg/config/static/entrypoints.go b/pkg/config/static/entrypoints.go index 2c1187ac5..d7b155dbe 100644 --- a/pkg/config/static/entrypoints.go +++ b/pkg/config/static/entrypoints.go @@ -12,6 +12,7 @@ import ( // EntryPoint holds the entry point configuration. type EntryPoint struct { Address string `description:"Entry point address." json:"address,omitempty" toml:"address,omitempty" yaml:"address,omitempty"` + ReusePort bool `description:"Enables EntryPoints from the same or different processes listening on the same TCP/UDP port." json:"reusePort,omitempty" toml:"reusePort,omitempty" yaml:"reusePort,omitempty"` AsDefault bool `description:"Adds this EntryPoint to the list of default EntryPoints to be used on routers that don't have any Entrypoint defined." json:"asDefault,omitempty" toml:"asDefault,omitempty" yaml:"asDefault,omitempty"` Transport *EntryPointsTransport `description:"Configures communication between clients and Traefik." json:"transport,omitempty" toml:"transport,omitempty" yaml:"transport,omitempty" export:"true"` ProxyProtocol *ProxyProtocol `description:"Proxy-Protocol configuration." json:"proxyProtocol,omitempty" toml:"proxyProtocol,omitempty" yaml:"proxyProtocol,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"` diff --git a/pkg/server/server_entrypoint_listenconfig_other.go b/pkg/server/server_entrypoint_listenconfig_other.go new file mode 100644 index 000000000..199012284 --- /dev/null +++ b/pkg/server/server_entrypoint_listenconfig_other.go @@ -0,0 +1,15 @@ +//go:build !(linux || freebsd || openbsd || darwin) + +package server + +import ( + "net" + + "github.com/traefik/traefik/v3/pkg/config/static" +) + +// newListenConfig creates a new net.ListenConfig for the given configuration of +// the entry point. +func newListenConfig(configuration *static.EntryPoint) (lc net.ListenConfig) { + return +} diff --git a/pkg/server/server_entrypoint_listenconfig_other_test.go b/pkg/server/server_entrypoint_listenconfig_other_test.go new file mode 100644 index 000000000..f2c736d26 --- /dev/null +++ b/pkg/server/server_entrypoint_listenconfig_other_test.go @@ -0,0 +1,44 @@ +//go:build !(linux || freebsd || openbsd || darwin) + +package server + +import ( + "context" + "net" + "testing" + + "github.com/stretchr/testify/require" + "github.com/traefik/traefik/v3/pkg/config/static" +) + +func TestNewListenConfig(t *testing.T) { + ep := static.EntryPoint{Address: ":0"} + listenConfig := newListenConfig(&ep) + require.Nil(t, listenConfig.Control) + require.Zero(t, listenConfig.KeepAlive) + + l1, err := listenConfig.Listen(context.Background(), "tcp", ep.Address) + require.NoError(t, err) + require.NotNil(t, l1) + defer l1.Close() + + l2, err := listenConfig.Listen(context.Background(), "tcp", l1.Addr().String()) + require.Error(t, err) + require.ErrorContains(t, err, "address already in use") + require.Nil(t, l2) + + ep = static.EntryPoint{Address: ":0", ReusePort: true} + listenConfig = newListenConfig(&ep) + require.Nil(t, listenConfig.Control) + require.Zero(t, listenConfig.KeepAlive) + + l3, err := listenConfig.Listen(context.Background(), "tcp", ep.Address) + require.NoError(t, err) + require.NotNil(t, l3) + defer l3.Close() + + l4, err := listenConfig.Listen(context.Background(), "tcp", l3.Addr().String()) + require.Error(t, err) + require.ErrorContains(t, err, "address already in use") + require.Nil(t, l4) +} diff --git a/pkg/server/server_entrypoint_listenconfig_unix.go b/pkg/server/server_entrypoint_listenconfig_unix.go new file mode 100644 index 000000000..7e43f4f7d --- /dev/null +++ b/pkg/server/server_entrypoint_listenconfig_unix.go @@ -0,0 +1,44 @@ +//go:build linux || freebsd || openbsd || darwin + +package server + +import ( + "fmt" + "net" + "syscall" + + "github.com/traefik/traefik/v3/pkg/config/static" + "golang.org/x/sys/unix" +) + +// newListenConfig creates a new net.ListenConfig for the given configuration of +// the entry point. +func newListenConfig(configuration *static.EntryPoint) (lc net.ListenConfig) { + if configuration != nil && configuration.ReusePort { + lc.Control = controlReusePort + } + return +} + +// controlReusePort is a net.ListenConfig.Control function that enables SO_REUSEPORT +// on the socket. +func controlReusePort(network, address string, c syscall.RawConn) error { + var setSockOptErr error + err := c.Control(func(fd uintptr) { + // Note that net.ListenConfig enables unix.SO_REUSEADDR by default, + // as seen in https://go.dev/src/net/sockopt_linux.go. Therefore, no + // additional action is required to enable it here. + + setSockOptErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unixSOREUSEPORT, 1) + if setSockOptErr != nil { + return + } + }) + if err != nil { + return fmt.Errorf("control: %w", err) + } + if setSockOptErr != nil { + return fmt.Errorf("setsockopt: %w", setSockOptErr) + } + return nil +} diff --git a/pkg/server/server_entrypoint_listenconfig_unix_sockopt_freebsd.go b/pkg/server/server_entrypoint_listenconfig_unix_sockopt_freebsd.go new file mode 100644 index 000000000..e20b61c48 --- /dev/null +++ b/pkg/server/server_entrypoint_listenconfig_unix_sockopt_freebsd.go @@ -0,0 +1,7 @@ +//go:build freebsd + +package server + +import "golang.org/x/sys/unix" + +const unixSOREUSEPORT = unix.SO_REUSEPORT_LB diff --git a/pkg/server/server_entrypoint_listenconfig_unix_sockopt_other.go b/pkg/server/server_entrypoint_listenconfig_unix_sockopt_other.go new file mode 100644 index 000000000..306dd8bd2 --- /dev/null +++ b/pkg/server/server_entrypoint_listenconfig_unix_sockopt_other.go @@ -0,0 +1,7 @@ +//go:build linux || openbsd || darwin + +package server + +import "golang.org/x/sys/unix" + +const unixSOREUSEPORT = unix.SO_REUSEPORT diff --git a/pkg/server/server_entrypoint_listenconfig_unix_test.go b/pkg/server/server_entrypoint_listenconfig_unix_test.go new file mode 100644 index 000000000..a5f7dda0a --- /dev/null +++ b/pkg/server/server_entrypoint_listenconfig_unix_test.go @@ -0,0 +1,56 @@ +//go:build linux || freebsd || openbsd || darwin + +package server + +import ( + "context" + "net" + "testing" + + "github.com/stretchr/testify/require" + "github.com/traefik/traefik/v3/pkg/config/static" +) + +func TestNewListenConfig(t *testing.T) { + ep := static.EntryPoint{Address: ":0"} + listenConfig := newListenConfig(&ep) + require.Nil(t, listenConfig.Control) + require.Zero(t, listenConfig.KeepAlive) + + l1, err := listenConfig.Listen(context.Background(), "tcp", ep.Address) + require.NoError(t, err) + require.NotNil(t, l1) + defer l1.Close() + + l2, err := listenConfig.Listen(context.Background(), "tcp", l1.Addr().String()) + require.Error(t, err) + require.ErrorContains(t, err, "address already in use") + require.Nil(t, l2) + + ep = static.EntryPoint{Address: ":0", ReusePort: true} + listenConfig = newListenConfig(&ep) + require.NotNil(t, listenConfig.Control) + require.Zero(t, listenConfig.KeepAlive) + + l3, err := listenConfig.Listen(context.Background(), "tcp", ep.Address) + require.NoError(t, err) + require.NotNil(t, l3) + defer l3.Close() + + l4, err := listenConfig.Listen(context.Background(), "tcp", l3.Addr().String()) + require.NoError(t, err) + require.NotNil(t, l4) + defer l4.Close() + + _, l3Port, err := net.SplitHostPort(l3.Addr().String()) + require.NoError(t, err) + l5, err := listenConfig.Listen(context.Background(), "tcp", "127.0.0.1:"+l3Port) + require.NoError(t, err) + require.NotNil(t, l5) + defer l5.Close() + + l6, err := listenConfig.Listen(context.Background(), "tcp", l1.Addr().String()) + require.Error(t, err) + require.ErrorContains(t, err, "address already in use") + require.Nil(t, l6) +} diff --git a/pkg/server/server_entrypoint_tcp.go b/pkg/server/server_entrypoint_tcp.go index e4d2892a8..c7d33abb7 100644 --- a/pkg/server/server_entrypoint_tcp.go +++ b/pkg/server/server_entrypoint_tcp.go @@ -460,7 +460,8 @@ func buildProxyProtocolListener(ctx context.Context, entryPoint *static.EntryPoi } func buildListener(ctx context.Context, entryPoint *static.EntryPoint) (net.Listener, error) { - listener, err := net.Listen("tcp", entryPoint.GetAddress()) + listenConfig := newListenConfig(entryPoint) + listener, err := listenConfig.Listen(ctx, "tcp", entryPoint.GetAddress()) if err != nil { return nil, fmt.Errorf("error opening listener: %w", err) } diff --git a/pkg/server/server_entrypoint_tcp_http3.go b/pkg/server/server_entrypoint_tcp_http3.go index 188be8026..827d6638b 100644 --- a/pkg/server/server_entrypoint_tcp_http3.go +++ b/pkg/server/server_entrypoint_tcp_http3.go @@ -33,7 +33,8 @@ func newHTTP3Server(ctx context.Context, configuration *static.EntryPoint, https return nil, errors.New("advertised port must be greater than or equal to zero") } - conn, err := net.ListenPacket("udp", configuration.GetAddress()) + listenConfig := newListenConfig(configuration) + conn, err := listenConfig.ListenPacket(ctx, "udp", configuration.GetAddress()) if err != nil { return nil, fmt.Errorf("starting listener: %w", err) } diff --git a/pkg/server/server_entrypoint_udp.go b/pkg/server/server_entrypoint_udp.go index 347b169db..08ed4fb1f 100644 --- a/pkg/server/server_entrypoint_udp.go +++ b/pkg/server/server_entrypoint_udp.go @@ -3,7 +3,6 @@ package server import ( "context" "fmt" - "net" "sync" "time" @@ -87,12 +86,8 @@ type UDPEntryPoint struct { // NewUDPEntryPoint returns a UDP entry point. func NewUDPEntryPoint(cfg *static.EntryPoint) (*UDPEntryPoint, error) { - addr, err := net.ResolveUDPAddr("udp", cfg.GetAddress()) - if err != nil { - return nil, err - } - - listener, err := udp.Listen("udp", addr, time.Duration(cfg.UDP.Timeout)) + listenConfig := newListenConfig(cfg) + listener, err := udp.Listen(listenConfig, "udp", cfg.GetAddress(), time.Duration(cfg.UDP.Timeout)) if err != nil { return nil, err } diff --git a/pkg/udp/conn.go b/pkg/udp/conn.go index 5753a9c13..6b6054b69 100644 --- a/pkg/udp/conn.go +++ b/pkg/udp/conn.go @@ -1,7 +1,9 @@ package udp import ( + "context" "errors" + "fmt" "io" "net" "sync" @@ -33,18 +35,22 @@ type Listener struct { } // Listen creates a new listener. -func Listen(network string, laddr *net.UDPAddr, timeout time.Duration) (*Listener, error) { +func Listen(listenConfig net.ListenConfig, network, address string, timeout time.Duration) (*Listener, error) { if timeout <= 0 { return nil, errors.New("timeout should be greater than zero") } - conn, err := net.ListenUDP(network, laddr) + packetConn, err := listenConfig.ListenPacket(context.Background(), network, address) if err != nil { - return nil, err + return nil, fmt.Errorf("listen packet: %w", err) + } + pConn, ok := packetConn.(*net.UDPConn) + if !ok { + return nil, errors.New("packet conn is not an UDPConn") } l := &Listener{ - pConn: conn, + pConn: pConn, acceptCh: make(chan *Conn), conns: make(map[string]*Conn), accepting: true, diff --git a/pkg/udp/conn_test.go b/pkg/udp/conn_test.go index 44e748474..3c8cc0a7f 100644 --- a/pkg/udp/conn_test.go +++ b/pkg/udp/conn_test.go @@ -14,10 +14,7 @@ import ( ) func TestConsecutiveWrites(t *testing.T) { - addr, err := net.ResolveUDPAddr("udp", ":0") - require.NoError(t, err) - - ln, err := Listen("udp", addr, 3*time.Second) + ln, err := Listen(net.ListenConfig{}, "udp", ":0", 3*time.Second) require.NoError(t, err) defer func() { err := ln.Close() @@ -75,11 +72,7 @@ func TestConsecutiveWrites(t *testing.T) { } func TestListenNotBlocking(t *testing.T) { - addr, err := net.ResolveUDPAddr("udp", ":0") - - require.NoError(t, err) - - ln, err := Listen("udp", addr, 3*time.Second) + ln, err := Listen(net.ListenConfig{}, "udp", ":0", 3*time.Second) require.NoError(t, err) defer func() { err := ln.Close() @@ -165,10 +158,7 @@ func TestListenNotBlocking(t *testing.T) { } func TestListenWithZeroTimeout(t *testing.T) { - addr, err := net.ResolveUDPAddr("udp", ":0") - require.NoError(t, err) - - _, err = Listen("udp", addr, 0) + _, err := Listen(net.ListenConfig{}, "udp", ":0", 0) assert.Error(t, err) } @@ -183,10 +173,7 @@ func TestTimeoutWithoutRead(t *testing.T) { func testTimeout(t *testing.T, withRead bool) { t.Helper() - addr, err := net.ResolveUDPAddr("udp", ":0") - require.NoError(t, err) - - ln, err := Listen("udp", addr, 3*time.Second) + ln, err := Listen(net.ListenConfig{}, "udp", ":0", 3*time.Second) require.NoError(t, err) defer func() { err := ln.Close() @@ -227,10 +214,7 @@ func testTimeout(t *testing.T, withRead bool) { } func TestShutdown(t *testing.T) { - addr, err := net.ResolveUDPAddr("udp", ":0") - require.NoError(t, err) - - l, err := Listen("udp", addr, 3*time.Second) + l, err := Listen(net.ListenConfig{}, "udp", ":0", 3*time.Second) require.NoError(t, err) go func() { @@ -331,10 +315,7 @@ func TestReadLoopMaxDataSize(t *testing.T) { doneCh := make(chan struct{}) - addr, err := net.ResolveUDPAddr("udp", ":0") - require.NoError(t, err) - - l, err := Listen("udp", addr, 3*time.Second) + l, err := Listen(net.ListenConfig{}, "udp", ":0", 3*time.Second) require.NoError(t, err) defer func() { diff --git a/pkg/udp/proxy_test.go b/pkg/udp/proxy_test.go index b3ce2ec2c..3b5703875 100644 --- a/pkg/udp/proxy_test.go +++ b/pkg/udp/proxy_test.go @@ -96,10 +96,7 @@ func TestProxy_ServeUDP_MaxDataSize(t *testing.T) { func newServer(t *testing.T, addr string, handler Handler) { t.Helper() - addrL, err := net.ResolveUDPAddr("udp", addr) - require.NoError(t, err) - - listener, err := Listen("udp", addrL, 3*time.Second) + listener, err := Listen(net.ListenConfig{}, "udp", addr, 3*time.Second) require.NoError(t, err) for {