traefik/pkg/middlewares/tcp/inflightconn/inflight_conn.go

91 lines
2.1 KiB
Go
Raw Normal View History

2022-10-26 15:42:07 +00:00
package inflightconn
import (
"context"
"fmt"
"net"
"sync"
2023-02-03 14:24:05 +00:00
"github.com/traefik/traefik/v3/pkg/config/dynamic"
"github.com/traefik/traefik/v3/pkg/middlewares"
"github.com/traefik/traefik/v3/pkg/tcp"
)
const typeName = "InFlightConnTCP"
type inFlightConn struct {
name string
next tcp.Handler
maxConnections int64
mu sync.Mutex
connections map[string]int64 // current number of connections by remote IP.
}
// New creates a max connections middleware.
// The connections are identified and grouped by remote IP.
func New(ctx context.Context, next tcp.Handler, config dynamic.TCPInFlightConn, name string) (tcp.Handler, error) {
2022-11-21 17:36:05 +00:00
logger := middlewares.GetLogger(ctx, name, typeName)
logger.Debug().Msg("Creating middleware")
return &inFlightConn{
name: name,
next: next,
connections: make(map[string]int64),
maxConnections: config.Amount,
}, nil
}
// ServeTCP serves the given TCP connection.
func (i *inFlightConn) ServeTCP(conn tcp.WriteCloser) {
2022-11-21 17:36:05 +00:00
logger := middlewares.GetLogger(context.Background(), i.name, typeName)
ip, _, err := net.SplitHostPort(conn.RemoteAddr().String())
if err != nil {
2022-11-21 17:36:05 +00:00
logger.Error().Err(err).Msg("Cannot parse IP from remote addr")
conn.Close()
return
}
if err = i.increment(ip); err != nil {
2022-11-21 17:36:05 +00:00
logger.Error().Err(err).Msg("Connection rejected")
conn.Close()
return
}
defer i.decrement(ip)
i.next.ServeTCP(conn)
}
// increment increases the counter for the number of connections tracked for the
// given IP.
// It returns an error if the counter would go above the max allowed number of
// connections.
func (i *inFlightConn) increment(ip string) error {
i.mu.Lock()
defer i.mu.Unlock()
if i.connections[ip] >= i.maxConnections {
return fmt.Errorf("max number of connections reached for %s", ip)
}
i.connections[ip]++
return nil
}
// decrement decreases the counter for the number of connections tracked for the
// given IP.
// It ensures that the counter does not go below zero.
func (i *inFlightConn) decrement(ip string) {
i.mu.Lock()
defer i.mu.Unlock()
if i.connections[ip] <= 0 {
return
}
i.connections[ip]--
}