From 93de7cf0c0ed478600021ce81eb487c0d6717f69 Mon Sep 17 00:00:00 2001 From: Tom Moulard Date: Mon, 29 Nov 2021 17:12:06 +0100 Subject: [PATCH] feat: add in flight connection middleware --- docs/content/middlewares/tcp/inflightconn.md | 63 ++++++++++++ docs/content/middlewares/tcp/overview.md | 3 +- .../traefik.containo.us_middlewaretcps.yaml | 7 ++ docs/mkdocs.yml | 1 + integration/fixtures/k8s/01-traefik-crd.yml | 7 ++ pkg/config/dynamic/tcp_middlewares.go | 10 +- pkg/config/dynamic/zz_generated.deepcopy.go | 21 ++++ pkg/config/label/label_test.go | 12 +++ .../tcp/inflightconn/inflight_conn.go | 92 ++++++++++++++++++ .../tcp/inflightconn/inflight_conn_test.go | 95 +++++++++++++++++++ pkg/provider/kubernetes/crd/kubernetes.go | 3 +- .../crd/traefik/v1alpha1/middlewaretcp.go | 3 +- .../traefik/v1alpha1/zz_generated.deepcopy.go | 5 + pkg/server/middleware/tcp/middlewares.go | 8 ++ 14 files changed, 326 insertions(+), 4 deletions(-) create mode 100644 docs/content/middlewares/tcp/inflightconn.md create mode 100644 pkg/middlewares/tcp/inflightconn/inflight_conn.go create mode 100644 pkg/middlewares/tcp/inflightconn/inflight_conn_test.go diff --git a/docs/content/middlewares/tcp/inflightconn.md b/docs/content/middlewares/tcp/inflightconn.md new file mode 100644 index 000000000..73e9e0df0 --- /dev/null +++ b/docs/content/middlewares/tcp/inflightconn.md @@ -0,0 +1,63 @@ +# InFlightConn + +Limiting the Number of Simultaneous connections. +{: .subtitle } + +To proactively prevent services from being overwhelmed with high load, the number of allowed simultaneous connections by IP can be limited. + +## Configuration Examples + +```yaml tab="Docker" +labels: + - "traefik.tcp.middlewares.test-inflightconn.inflightconn.amount=10" +``` + +```yaml tab="Kubernetes" +apiVersion: traefik.containo.us/v1alpha1 +kind: Middleware +metadata: + name: test-inflightconn +spec: + inFlightConn: + amount: 10 +``` + +```yaml tab="Consul Catalog" +# Limiting to 10 simultaneous connections +- "traefik.tcp.middlewares.test-inflightconn.inflightconn.amount=10" +``` + +```json tab="Marathon" +"labels": { + "traefik.tcp.middlewares.test-inflightconn.inflightconn.amount": "10" +} +``` + +```yaml tab="Rancher" +# Limiting to 10 simultaneous connections. +labels: + - "traefik.tcp.middlewares.test-inflightconn.inflightconn.amount=10" +``` + +```yaml tab="File (YAML)" +# Limiting to 10 simultaneous connections. +tcp: + middlewares: + test-inflightconn: + inFlightConn: + amount: 10 +``` + +```toml tab="File (TOML)" +# Limiting to 10 simultaneous connections +[tcp.middlewares] + [tcp.middlewares.test-inflightconn.inFlightConn] + amount = 10 +``` + +## Configuration Options + +### `amount` + +The `amount` option defines the maximum amount of allowed simultaneous connections. +The middleware closes the connection if there are already `amount` connections opened. diff --git a/docs/content/middlewares/tcp/overview.md b/docs/content/middlewares/tcp/overview.md index 310cae1af..1fd53a963 100644 --- a/docs/content/middlewares/tcp/overview.md +++ b/docs/content/middlewares/tcp/overview.md @@ -131,4 +131,5 @@ tcp: | Middleware | Purpose | Area | |-------------------------------------------|---------------------------------------------------|-----------------------------| -| [IPWhiteList](ipwhitelist.md) | Limit the allowed client IPs | Security, Request lifecycle | +| [InFlightConn](inflightconn.md) | Limits the number of simultaneous connections. | Security, Request lifecycle | +| [IPWhiteList](ipwhitelist.md) | Limit the allowed client IPs. | Security, Request lifecycle | diff --git a/docs/content/reference/dynamic-configuration/traefik.containo.us_middlewaretcps.yaml b/docs/content/reference/dynamic-configuration/traefik.containo.us_middlewaretcps.yaml index 29089f20c..35aa98931 100644 --- a/docs/content/reference/dynamic-configuration/traefik.containo.us_middlewaretcps.yaml +++ b/docs/content/reference/dynamic-configuration/traefik.containo.us_middlewaretcps.yaml @@ -36,6 +36,13 @@ spec: spec: description: MiddlewareTCPSpec holds the MiddlewareTCP configuration. properties: + inFlightConn: + description: TCPInFlightConn holds the TCP in flight connection configuration. + properties: + amount: + format: int64 + type: integer + type: object ipWhiteList: description: TCPIPWhiteList holds the TCP ip white list configuration. properties: diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 62029fc6d..e360fec0d 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -132,6 +132,7 @@ nav: - 'StripPrefixRegex': 'middlewares/http/stripprefixregex.md' - 'TCP': - 'Overview': 'middlewares/tcp/overview.md' + - 'InFlightConn': 'middlewares/tcp/inflightconn.md' - 'IpWhitelist': 'middlewares/tcp/ipwhitelist.md' - 'Plugins & Traefik Pilot': 'plugins/index.md' - 'Operations': diff --git a/integration/fixtures/k8s/01-traefik-crd.yml b/integration/fixtures/k8s/01-traefik-crd.yml index fbde10895..0daee3add 100644 --- a/integration/fixtures/k8s/01-traefik-crd.yml +++ b/integration/fixtures/k8s/01-traefik-crd.yml @@ -1050,6 +1050,13 @@ spec: spec: description: MiddlewareTCPSpec holds the MiddlewareTCP configuration. properties: + inFlightConn: + description: TCPInFlightConn holds the TCP in flight connection configuration. + properties: + amount: + format: int64 + type: integer + type: object ipWhiteList: description: TCPIPWhiteList holds the TCP ip white list configuration. properties: diff --git a/pkg/config/dynamic/tcp_middlewares.go b/pkg/config/dynamic/tcp_middlewares.go index 83b2d1169..1108319f5 100644 --- a/pkg/config/dynamic/tcp_middlewares.go +++ b/pkg/config/dynamic/tcp_middlewares.go @@ -4,7 +4,15 @@ package dynamic // TCPMiddleware holds the TCPMiddleware configuration. type TCPMiddleware struct { - IPWhiteList *TCPIPWhiteList `json:"ipWhiteList,omitempty" toml:"ipWhiteList,omitempty" yaml:"ipWhiteList,omitempty" export:"true"` + InFlightConn *TCPInFlightConn `json:"InFlightConn,omitempty" toml:"InFlightConn,omitempty" yaml:"InFlightConn,omitempty" export:"true"` + IPWhiteList *TCPIPWhiteList `json:"ipWhiteList,omitempty" toml:"ipWhiteList,omitempty" yaml:"ipWhiteList,omitempty" export:"true"` +} + +// +k8s:deepcopy-gen=true + +// TCPInFlightConn holds the TCP in flight connection configuration. +type TCPInFlightConn struct { + Amount int64 `json:"amount,omitempty" toml:"amount,omitempty" yaml:"amount,omitempty" export:"true"` } // +k8s:deepcopy-gen=true diff --git a/pkg/config/dynamic/zz_generated.deepcopy.go b/pkg/config/dynamic/zz_generated.deepcopy.go index da72eb9de..783fe55d3 100644 --- a/pkg/config/dynamic/zz_generated.deepcopy.go +++ b/pkg/config/dynamic/zz_generated.deepcopy.go @@ -1350,9 +1350,30 @@ func (in *TCPIPWhiteList) DeepCopy() *TCPIPWhiteList { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TCPInFlightConn) DeepCopyInto(out *TCPInFlightConn) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TCPInFlightConn. +func (in *TCPInFlightConn) DeepCopy() *TCPInFlightConn { + if in == nil { + return nil + } + out := new(TCPInFlightConn) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TCPMiddleware) DeepCopyInto(out *TCPMiddleware) { *out = *in + if in.InFlightConn != nil { + in, out := &in.InFlightConn, &out.InFlightConn + *out = new(TCPInFlightConn) + **out = **in + } if in.IPWhiteList != nil { in, out := &in.IPWhiteList, &out.IPWhiteList *out = new(TCPIPWhiteList) diff --git a/pkg/config/label/label_test.go b/pkg/config/label/label_test.go index 19e6dbd74..4a2126f85 100644 --- a/pkg/config/label/label_test.go +++ b/pkg/config/label/label_test.go @@ -174,6 +174,7 @@ func TestDecodeConfiguration(t *testing.T) { "traefik.http.services.Service1.loadbalancer.sticky.cookie.name": "fui", "traefik.tcp.middlewares.Middleware0.ipwhitelist.sourcerange": "foobar, fiibar", + "traefik.tcp.middlewares.Middleware2.inflightconn.amount": "42", "traefik.tcp.routers.Router0.rule": "foobar", "traefik.tcp.routers.Router0.entrypoints": "foobar, fiibar", "traefik.tcp.routers.Router0.service": "foobar", @@ -236,6 +237,11 @@ func TestDecodeConfiguration(t *testing.T) { SourceRange: []string{"foobar", "fiibar"}, }, }, + "Middleware2": { + InFlightConn: &dynamic.TCPInFlightConn{ + Amount: 42, + }, + }, }, Services: map[string]*dynamic.TCPService{ "Service0": { @@ -719,6 +725,11 @@ func TestEncodeConfiguration(t *testing.T) { SourceRange: []string{"foobar", "fiibar"}, }, }, + "Middleware2": { + InFlightConn: &dynamic.TCPInFlightConn{ + Amount: 42, + }, + }, }, Services: map[string]*dynamic.TCPService{ "Service0": { @@ -1320,6 +1331,7 @@ func TestEncodeConfiguration(t *testing.T) { "traefik.HTTP.Services.Service0.LoadBalancer.HealthCheck.Headers.name0": "foobar", "traefik.TCP.Middlewares.Middleware0.IPWhiteList.SourceRange": "foobar, fiibar", + "traefik.TCP.Middlewares.Middleware2.InFlightConn.Amount": "42", "traefik.TCP.Routers.Router0.Rule": "foobar", "traefik.TCP.Routers.Router0.EntryPoints": "foobar, fiibar", "traefik.TCP.Routers.Router0.Service": "foobar", diff --git a/pkg/middlewares/tcp/inflightconn/inflight_conn.go b/pkg/middlewares/tcp/inflightconn/inflight_conn.go new file mode 100644 index 000000000..dbb602432 --- /dev/null +++ b/pkg/middlewares/tcp/inflightconn/inflight_conn.go @@ -0,0 +1,92 @@ +package tcpinflightconn + +import ( + "context" + "fmt" + "net" + "sync" + + "github.com/traefik/traefik/v2/pkg/config/dynamic" + "github.com/traefik/traefik/v2/pkg/log" + "github.com/traefik/traefik/v2/pkg/middlewares" + "github.com/traefik/traefik/v2/pkg/tcp" +) + +const typeName = "InFlightConnTCP" + +type inFlightConn struct { + name string + next tcp.Handler + maxConnections int64 + + mu sync.Mutex + connections map[string]int64 // current number of connections by remote IP. +} + +// New creates a max connections middleware. +// The connections are identified and grouped by remote IP. +func New(ctx context.Context, next tcp.Handler, config dynamic.TCPInFlightConn, name string) (tcp.Handler, error) { + logger := log.FromContext(middlewares.GetLoggerCtx(ctx, name, typeName)) + logger.Debug("Creating middleware") + + return &inFlightConn{ + name: name, + next: next, + connections: make(map[string]int64), + maxConnections: config.Amount, + }, nil +} + +// ServeTCP serves the given TCP connection. +func (i *inFlightConn) ServeTCP(conn tcp.WriteCloser) { + ctx := middlewares.GetLoggerCtx(context.Background(), i.name, typeName) + logger := log.FromContext(ctx) + + ip, _, err := net.SplitHostPort(conn.RemoteAddr().String()) + if err != nil { + logger.Errorf("Cannot parse IP from remote addr: %v", err) + conn.Close() + return + } + + if err = i.increment(ip); err != nil { + logger.Errorf("Connection rejected: %v", err) + conn.Close() + return + } + + defer i.decrement(ip) + + i.next.ServeTCP(conn) +} + +// increment increases the counter for the number of connections tracked for the +// given IP. +// It returns an error if the counter would go above the max allowed number of +// connections. +func (i *inFlightConn) increment(ip string) error { + i.mu.Lock() + defer i.mu.Unlock() + + if i.connections[ip] >= i.maxConnections { + return fmt.Errorf("max number of connections reached for %s", ip) + } + + i.connections[ip]++ + + return nil +} + +// decrement decreases the counter for the number of connections tracked for the +// given IP. +// It ensures that the counter does not go below zero. +func (i *inFlightConn) decrement(ip string) { + i.mu.Lock() + defer i.mu.Unlock() + + if i.connections[ip] <= 0 { + return + } + + i.connections[ip]-- +} diff --git a/pkg/middlewares/tcp/inflightconn/inflight_conn_test.go b/pkg/middlewares/tcp/inflightconn/inflight_conn_test.go new file mode 100644 index 000000000..666df4317 --- /dev/null +++ b/pkg/middlewares/tcp/inflightconn/inflight_conn_test.go @@ -0,0 +1,95 @@ +package tcpinflightconn + +import ( + "context" + "net" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/traefik/traefik/v2/pkg/config/dynamic" + "github.com/traefik/traefik/v2/pkg/tcp" +) + +func TestInFlightConn_ServeTCP(t *testing.T) { + proceedCh := make(chan struct{}) + waitCh := make(chan struct{}) + finishCh := make(chan struct{}) + + next := tcp.HandlerFunc(func(conn tcp.WriteCloser) { + proceedCh <- struct{}{} + + if fc, ok := conn.(fakeConn); !ok || !fc.wait { + return + } + + <-waitCh + finishCh <- struct{}{} + }) + + middleware, err := New(context.Background(), next, dynamic.TCPInFlightConn{Amount: 1}, "foo") + require.NoError(t, err) + + // The first connection should succeed and wait. + go middleware.ServeTCP(fakeConn{addr: "127.0.0.1:9000", wait: true}) + requireMessage(t, proceedCh) + + closeCh := make(chan struct{}) + + // The second connection from the same remote address should be closed as the maximum number of connections is exceeded. + go middleware.ServeTCP(fakeConn{addr: "127.0.0.1:9000", closeCh: closeCh}) + requireMessage(t, closeCh) + + // The connection from another remote address should succeed. + go middleware.ServeTCP(fakeConn{addr: "127.0.0.2:9000"}) + requireMessage(t, proceedCh) + + // Once the first connection is closed, next connection with the same remote address should succeed. + close(waitCh) + requireMessage(t, finishCh) + + go middleware.ServeTCP(fakeConn{addr: "127.0.0.1:9000"}) + requireMessage(t, proceedCh) +} + +func requireMessage(t *testing.T, c chan struct{}) { + t.Helper() + select { + case <-c: + case <-time.After(time.Second): + t.Fatal("Timeout waiting for message") + } +} + +type fakeConn struct { + net.Conn + + addr string + wait bool + closeCh chan struct{} +} + +func (c fakeConn) RemoteAddr() net.Addr { + return fakeAddr{addr: c.addr} +} + +func (c fakeConn) Close() error { + close(c.closeCh) + return nil +} + +func (c fakeConn) CloseWrite() error { + panic("implement me") +} + +type fakeAddr struct { + addr string +} + +func (a fakeAddr) Network() string { + return "tcp" +} + +func (a fakeAddr) String() string { + return a.addr +} diff --git a/pkg/provider/kubernetes/crd/kubernetes.go b/pkg/provider/kubernetes/crd/kubernetes.go index 5a63d80c0..f8a3679e4 100644 --- a/pkg/provider/kubernetes/crd/kubernetes.go +++ b/pkg/provider/kubernetes/crd/kubernetes.go @@ -273,7 +273,8 @@ func (p *Provider) loadConfigurationFromCRD(ctx context.Context, client Client) id := provider.Normalize(makeID(middlewareTCP.Namespace, middlewareTCP.Name)) conf.TCP.Middlewares[id] = &dynamic.TCPMiddleware{ - IPWhiteList: middlewareTCP.Spec.IPWhiteList, + InFlightConn: middlewareTCP.Spec.InFlightConn, + IPWhiteList: middlewareTCP.Spec.IPWhiteList, } } diff --git a/pkg/provider/kubernetes/crd/traefik/v1alpha1/middlewaretcp.go b/pkg/provider/kubernetes/crd/traefik/v1alpha1/middlewaretcp.go index 27a796ad6..ff42921d0 100644 --- a/pkg/provider/kubernetes/crd/traefik/v1alpha1/middlewaretcp.go +++ b/pkg/provider/kubernetes/crd/traefik/v1alpha1/middlewaretcp.go @@ -20,7 +20,8 @@ type MiddlewareTCP struct { // MiddlewareTCPSpec holds the MiddlewareTCP configuration. type MiddlewareTCPSpec struct { - IPWhiteList *dynamic.TCPIPWhiteList `json:"ipWhiteList,omitempty"` + InFlightConn *dynamic.TCPInFlightConn `json:"inFlightConn,omitempty"` + IPWhiteList *dynamic.TCPIPWhiteList `json:"ipWhiteList,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/provider/kubernetes/crd/traefik/v1alpha1/zz_generated.deepcopy.go b/pkg/provider/kubernetes/crd/traefik/v1alpha1/zz_generated.deepcopy.go index c82f45868..2b463d97a 100644 --- a/pkg/provider/kubernetes/crd/traefik/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/provider/kubernetes/crd/traefik/v1alpha1/zz_generated.deepcopy.go @@ -820,6 +820,11 @@ func (in *MiddlewareTCPList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MiddlewareTCPSpec) DeepCopyInto(out *MiddlewareTCPSpec) { *out = *in + if in.InFlightConn != nil { + in, out := &in.InFlightConn, &out.InFlightConn + *out = new(dynamic.TCPInFlightConn) + **out = **in + } if in.IPWhiteList != nil { in, out := &in.IPWhiteList, &out.IPWhiteList *out = new(dynamic.TCPIPWhiteList) diff --git a/pkg/server/middleware/tcp/middlewares.go b/pkg/server/middleware/tcp/middlewares.go index 4ea1697bb..7e1ad1e17 100644 --- a/pkg/server/middleware/tcp/middlewares.go +++ b/pkg/server/middleware/tcp/middlewares.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/traefik/traefik/v2/pkg/config/runtime" + inflightconn "github.com/traefik/traefik/v2/pkg/middlewares/tcp/inflightconn" ipwhitelist "github.com/traefik/traefik/v2/pkg/middlewares/tcp/ipwhitelist" "github.com/traefik/traefik/v2/pkg/server/provider" "github.com/traefik/traefik/v2/pkg/tcp" @@ -86,6 +87,13 @@ func (b *Builder) buildConstructor(ctx context.Context, middlewareName string) ( var middleware tcp.Constructor + // InFlightConn + if config.InFlightConn != nil { + middleware = func(next tcp.Handler) (tcp.Handler, error) { + return inflightconn.New(ctx, next, *config.InFlightConn, middlewareName) + } + } + // IPWhiteList if config.IPWhiteList != nil { middleware = func(next tcp.Handler) (tcp.Handler, error) {