package server import ( "context" "errors" "fmt" stdlog "log" "net" "net/http" "net/url" "os" "strings" "sync" "syscall" "time" "github.com/containous/alice" gokitmetrics "github.com/go-kit/kit/metrics" "github.com/pires/go-proxyproto" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/traefik/traefik/v3/pkg/config/static" "github.com/traefik/traefik/v3/pkg/ip" "github.com/traefik/traefik/v3/pkg/logs" "github.com/traefik/traefik/v3/pkg/metrics" "github.com/traefik/traefik/v3/pkg/middlewares" "github.com/traefik/traefik/v3/pkg/middlewares/contenttype" "github.com/traefik/traefik/v3/pkg/middlewares/forwardedheaders" "github.com/traefik/traefik/v3/pkg/middlewares/requestdecorator" "github.com/traefik/traefik/v3/pkg/safe" "github.com/traefik/traefik/v3/pkg/server/router" tcprouter "github.com/traefik/traefik/v3/pkg/server/router/tcp" "github.com/traefik/traefik/v3/pkg/tcp" "github.com/traefik/traefik/v3/pkg/types" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" ) type httpForwarder struct { net.Listener connChan chan net.Conn errChan chan error } func newHTTPForwarder(ln net.Listener) *httpForwarder { return &httpForwarder{ Listener: ln, connChan: make(chan net.Conn), errChan: make(chan error), } } // ServeTCP uses the connection to serve it later in "Accept". func (h *httpForwarder) ServeTCP(conn tcp.WriteCloser) { h.connChan <- conn } // Accept retrieves a served connection in ServeTCP. func (h *httpForwarder) Accept() (net.Conn, error) { select { case conn := <-h.connChan: return conn, nil case err := <-h.errChan: return nil, err } } // TCPEntryPoints holds a map of TCPEntryPoint (the entrypoint names being the keys). type TCPEntryPoints map[string]*TCPEntryPoint // NewTCPEntryPoints creates a new TCPEntryPoints. func NewTCPEntryPoints(entryPointsConfig static.EntryPoints, hostResolverConfig *types.HostResolverConfig, metricsRegistry metrics.Registry) (TCPEntryPoints, error) { serverEntryPointsTCP := make(TCPEntryPoints) for entryPointName, config := range entryPointsConfig { protocol, err := config.GetProtocol() if err != nil { return nil, fmt.Errorf("error while building entryPoint %s: %w", entryPointName, err) } if protocol != "tcp" { continue } ctx := log.With().Str(logs.EntryPointName, entryPointName).Logger().WithContext(context.Background()) openConnectionsGauge := metricsRegistry. OpenConnectionsGauge(). With("entrypoint", entryPointName, "protocol", "TCP") serverEntryPointsTCP[entryPointName], err = NewTCPEntryPoint(ctx, config, hostResolverConfig, openConnectionsGauge) if err != nil { return nil, fmt.Errorf("error while building entryPoint %s: %w", entryPointName, err) } } return serverEntryPointsTCP, nil } // Start the server entry points. func (eps TCPEntryPoints) Start() { for entryPointName, serverEntryPoint := range eps { ctx := log.With().Str(logs.EntryPointName, entryPointName).Logger().WithContext(context.Background()) go serverEntryPoint.Start(ctx) } } // Stop the server entry points. func (eps TCPEntryPoints) Stop() { var wg sync.WaitGroup for epn, ep := range eps { wg.Add(1) go func(entryPointName string, entryPoint *TCPEntryPoint) { defer wg.Done() logger := log.With().Str(logs.EntryPointName, entryPointName).Logger() entryPoint.Shutdown(logger.WithContext(context.Background())) logger.Debug().Msg("Entrypoint closed") }(epn, ep) } wg.Wait() } // Switch the TCP routers. func (eps TCPEntryPoints) Switch(routersTCP map[string]*tcprouter.Router) { for entryPointName, rt := range routersTCP { eps[entryPointName].SwitchRouter(rt) } } // TCPEntryPoint is the TCP server. type TCPEntryPoint struct { listener net.Listener switcher *tcp.HandlerSwitcher transportConfiguration *static.EntryPointsTransport tracker *connectionTracker httpServer *httpServer httpsServer *httpServer http3Server *http3server } // NewTCPEntryPoint creates a new TCPEntryPoint. func NewTCPEntryPoint(ctx context.Context, configuration *static.EntryPoint, hostResolverConfig *types.HostResolverConfig, openConnectionsGauge gokitmetrics.Gauge) (*TCPEntryPoint, error) { tracker := newConnectionTracker(openConnectionsGauge) listener, err := buildListener(ctx, configuration) if err != nil { return nil, fmt.Errorf("error preparing server: %w", err) } rt := &tcprouter.Router{} reqDecorator := requestdecorator.New(hostResolverConfig) httpServer, err := createHTTPServer(ctx, listener, configuration, true, reqDecorator) if err != nil { return nil, fmt.Errorf("error preparing http server: %w", err) } rt.SetHTTPForwarder(httpServer.Forwarder) httpsServer, err := createHTTPServer(ctx, listener, configuration, false, reqDecorator) if err != nil { return nil, fmt.Errorf("error preparing https server: %w", err) } h3Server, err := newHTTP3Server(ctx, configuration, httpsServer) if err != nil { return nil, fmt.Errorf("error preparing http3 server: %w", err) } rt.SetHTTPSForwarder(httpsServer.Forwarder) tcpSwitcher := &tcp.HandlerSwitcher{} tcpSwitcher.Switch(rt) return &TCPEntryPoint{ listener: listener, switcher: tcpSwitcher, transportConfiguration: configuration.Transport, tracker: tracker, httpServer: httpServer, httpsServer: httpsServer, http3Server: h3Server, }, nil } // Start starts the TCP server. func (e *TCPEntryPoint) Start(ctx context.Context) { logger := log.Ctx(ctx) logger.Debug().Msg("Starting TCP Server") if e.http3Server != nil { go func() { _ = e.http3Server.Start() }() } for { conn, err := e.listener.Accept() if err != nil { logger.Error().Err(err).Send() var opErr *net.OpError if errors.As(err, &opErr) && opErr.Temporary() { continue } var urlErr *url.Error if errors.As(err, &urlErr) && urlErr.Temporary() { continue } e.httpServer.Forwarder.errChan <- err e.httpsServer.Forwarder.errChan <- err return } writeCloser, err := writeCloser(conn) if err != nil { panic(err) } safe.Go(func() { // Enforce read/write deadlines at the connection level, // because when we're peeking the first byte to determine whether we are doing TLS, // the deadlines at the server level are not taken into account. if e.transportConfiguration.RespondingTimeouts.ReadTimeout > 0 { err := writeCloser.SetReadDeadline(time.Now().Add(time.Duration(e.transportConfiguration.RespondingTimeouts.ReadTimeout))) if err != nil { logger.Error().Err(err).Msg("Error while setting read deadline") } } if e.transportConfiguration.RespondingTimeouts.WriteTimeout > 0 { err = writeCloser.SetWriteDeadline(time.Now().Add(time.Duration(e.transportConfiguration.RespondingTimeouts.WriteTimeout))) if err != nil { logger.Error().Err(err).Msg("Error while setting write deadline") } } e.switcher.ServeTCP(newTrackedConnection(writeCloser, e.tracker)) }) } } // Shutdown stops the TCP connections. func (e *TCPEntryPoint) Shutdown(ctx context.Context) { logger := log.Ctx(ctx) reqAcceptGraceTimeOut := time.Duration(e.transportConfiguration.LifeCycle.RequestAcceptGraceTimeout) if reqAcceptGraceTimeOut > 0 { logger.Info().Msgf("Waiting %s for incoming requests to cease", reqAcceptGraceTimeOut) time.Sleep(reqAcceptGraceTimeOut) } graceTimeOut := time.Duration(e.transportConfiguration.LifeCycle.GraceTimeOut) ctx, cancel := context.WithTimeout(ctx, graceTimeOut) logger.Debug().Msgf("Waiting %s seconds before killing connections", graceTimeOut) var wg sync.WaitGroup shutdownServer := func(server stoppable) { defer wg.Done() err := server.Shutdown(ctx) if err == nil { return } if errors.Is(ctx.Err(), context.DeadlineExceeded) { logger.Debug().Err(err).Msg("Server failed to shutdown within deadline") if err = server.Close(); err != nil { logger.Error().Err(err).Send() } return } logger.Error().Err(err).Send() // We expect Close to fail again because Shutdown most likely failed when trying to close a listener. // We still call it however, to make sure that all connections get closed as well. server.Close() } if e.httpServer.Server != nil { wg.Add(1) go shutdownServer(e.httpServer.Server) } if e.httpsServer.Server != nil { wg.Add(1) go shutdownServer(e.httpsServer.Server) if e.http3Server != nil { wg.Add(1) go shutdownServer(e.http3Server) } } if e.tracker != nil { wg.Add(1) go func() { defer wg.Done() err := e.tracker.Shutdown(ctx) if err == nil { return } if errors.Is(ctx.Err(), context.DeadlineExceeded) { logger.Debug().Err(err).Msg("Server failed to shutdown before deadline") } e.tracker.Close() }() } wg.Wait() cancel() } // SwitchRouter switches the TCP router handler. func (e *TCPEntryPoint) SwitchRouter(rt *tcprouter.Router) { rt.SetHTTPForwarder(e.httpServer.Forwarder) httpHandler := rt.GetHTTPHandler() if httpHandler == nil { httpHandler = router.BuildDefaultHTTPRouter() } e.httpServer.Switcher.UpdateHandler(httpHandler) rt.SetHTTPSForwarder(e.httpsServer.Forwarder) httpsHandler := rt.GetHTTPSHandler() if httpsHandler == nil { httpsHandler = router.BuildDefaultHTTPRouter() } e.httpsServer.Switcher.UpdateHandler(httpsHandler) e.switcher.Switch(rt) if e.http3Server != nil { e.http3Server.Switch(rt) } } // writeCloserWrapper wraps together a connection, and the concrete underlying // connection type that was found to satisfy WriteCloser. type writeCloserWrapper struct { net.Conn writeCloser tcp.WriteCloser } func (c *writeCloserWrapper) CloseWrite() error { return c.writeCloser.CloseWrite() } // writeCloser returns the given connection, augmented with the WriteCloser // implementation, if any was found within the underlying conn. func writeCloser(conn net.Conn) (tcp.WriteCloser, error) { switch typedConn := conn.(type) { case *proxyproto.Conn: underlying, ok := typedConn.TCPConn() if !ok { return nil, fmt.Errorf("underlying connection is not a tcp connection") } return &writeCloserWrapper{writeCloser: underlying, Conn: typedConn}, nil case *net.TCPConn: return typedConn, nil default: return nil, fmt.Errorf("unknown connection type %T", typedConn) } } // tcpKeepAliveListener sets TCP keep-alive timeouts on accepted // connections. type tcpKeepAliveListener struct { *net.TCPListener } func (ln tcpKeepAliveListener) Accept() (net.Conn, error) { tc, err := ln.AcceptTCP() if err != nil { return nil, err } if err := tc.SetKeepAlive(true); err != nil { return nil, err } if err := tc.SetKeepAlivePeriod(3 * time.Minute); err != nil { // Some systems, such as OpenBSD, have no user-settable per-socket TCP keepalive options. if !errors.Is(err, syscall.ENOPROTOOPT) { return nil, err } } return tc, nil } func buildProxyProtocolListener(ctx context.Context, entryPoint *static.EntryPoint, listener net.Listener) (net.Listener, error) { proxyListener := &proxyproto.Listener{Listener: listener} if entryPoint.ProxyProtocol.Insecure { log.Ctx(ctx).Info().Msg("Enabling ProxyProtocol without trusted IPs: Insecure") return proxyListener, nil } checker, err := ip.NewChecker(entryPoint.ProxyProtocol.TrustedIPs) if err != nil { return nil, err } proxyListener.Policy = func(upstream net.Addr) (proxyproto.Policy, error) { ipAddr, ok := upstream.(*net.TCPAddr) if !ok { return proxyproto.REJECT, fmt.Errorf("type error %v", upstream) } if !checker.ContainsIP(ipAddr.IP) { log.Ctx(ctx).Debug().Msgf("IP %s is not in trusted IPs list, ignoring ProxyProtocol Headers and bypass connection", ipAddr.IP) return proxyproto.IGNORE, nil } return proxyproto.USE, nil } log.Ctx(ctx).Info().Msgf("Enabling ProxyProtocol for trusted IPs %v", entryPoint.ProxyProtocol.TrustedIPs) return proxyListener, nil } func buildListener(ctx context.Context, entryPoint *static.EntryPoint) (net.Listener, error) { listener, err := net.Listen("tcp", entryPoint.GetAddress()) if err != nil { return nil, fmt.Errorf("error opening listener: %w", err) } listener = tcpKeepAliveListener{listener.(*net.TCPListener)} if entryPoint.ProxyProtocol != nil { listener, err = buildProxyProtocolListener(ctx, entryPoint, listener) if err != nil { return nil, fmt.Errorf("error creating proxy protocol listener: %w", err) } } return listener, nil } func newConnectionTracker(openConnectionsGauge gokitmetrics.Gauge) *connectionTracker { return &connectionTracker{ conns: make(map[net.Conn]struct{}), openConnectionsGauge: openConnectionsGauge, } } type connectionTracker struct { connsMu sync.RWMutex conns map[net.Conn]struct{} openConnectionsGauge gokitmetrics.Gauge } // AddConnection add a connection in the tracked connections list. func (c *connectionTracker) AddConnection(conn net.Conn) { c.connsMu.Lock() c.conns[conn] = struct{}{} c.connsMu.Unlock() if c.openConnectionsGauge != nil { c.openConnectionsGauge.Add(1) } } // RemoveConnection remove a connection from the tracked connections list. func (c *connectionTracker) RemoveConnection(conn net.Conn) { c.connsMu.Lock() delete(c.conns, conn) c.connsMu.Unlock() if c.openConnectionsGauge != nil { c.openConnectionsGauge.Add(-1) } } func (c *connectionTracker) isEmpty() bool { c.connsMu.RLock() defer c.connsMu.RUnlock() return len(c.conns) == 0 } // Shutdown wait for the connection closing. func (c *connectionTracker) Shutdown(ctx context.Context) error { ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() for { if c.isEmpty() { return nil } select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: } } } // Close close all the connections in the tracked connections list. func (c *connectionTracker) Close() { c.connsMu.Lock() defer c.connsMu.Unlock() for conn := range c.conns { if err := conn.Close(); err != nil { log.Error().Err(err).Msg("Error while closing connection") } delete(c.conns, conn) } } type stoppable interface { Shutdown(context.Context) error Close() error } type stoppableServer interface { stoppable Serve(listener net.Listener) error } type httpServer struct { Server stoppableServer Forwarder *httpForwarder Switcher *middlewares.HTTPHandlerSwitcher } func createHTTPServer(ctx context.Context, ln net.Listener, configuration *static.EntryPoint, withH2c bool, reqDecorator *requestdecorator.RequestDecorator) (*httpServer, error) { if configuration.HTTP2.MaxConcurrentStreams < 0 { return nil, errors.New("max concurrent streams value must be greater than or equal to zero") } httpSwitcher := middlewares.NewHandlerSwitcher(router.BuildDefaultHTTPRouter()) next, err := alice.New(requestdecorator.WrapHandler(reqDecorator)).Then(httpSwitcher) if err != nil { return nil, err } var handler http.Handler handler, err = forwardedheaders.NewXForwarded( configuration.ForwardedHeaders.Insecure, configuration.ForwardedHeaders.TrustedIPs, next) if err != nil { return nil, err } handler = http.AllowQuerySemicolons(handler) handler = contenttype.DisableAutoDetection(handler) if withH2c { handler = h2c.NewHandler(handler, &http2.Server{ MaxConcurrentStreams: uint32(configuration.HTTP2.MaxConcurrentStreams), }) } serverHTTP := &http.Server{ Handler: handler, ErrorLog: stdlog.New(logs.NoLevel(log.Logger, zerolog.DebugLevel), "", 0), ReadTimeout: time.Duration(configuration.Transport.RespondingTimeouts.ReadTimeout), WriteTimeout: time.Duration(configuration.Transport.RespondingTimeouts.WriteTimeout), IdleTimeout: time.Duration(configuration.Transport.RespondingTimeouts.IdleTimeout), } // ConfigureServer configures HTTP/2 with the MaxConcurrentStreams option for the given server. // Also keeping behavior the same as // https://cs.opensource.google/go/go/+/refs/tags/go1.17.7:src/net/http/server.go;l=3262 if !strings.Contains(os.Getenv("GODEBUG"), "http2server=0") { err = http2.ConfigureServer(serverHTTP, &http2.Server{ MaxConcurrentStreams: uint32(configuration.HTTP2.MaxConcurrentStreams), NewWriteScheduler: func() http2.WriteScheduler { return http2.NewPriorityWriteScheduler(nil) }, }) if err != nil { return nil, fmt.Errorf("configure HTTP/2 server: %w", err) } } listener := newHTTPForwarder(ln) go func() { err := serverHTTP.Serve(listener) if err != nil && !errors.Is(err, http.ErrServerClosed) { log.Ctx(ctx).Error().Err(err).Msg("Error while starting server") } }() return &httpServer{ Server: serverHTTP, Forwarder: listener, Switcher: httpSwitcher, }, nil } func newTrackedConnection(conn tcp.WriteCloser, tracker *connectionTracker) *trackedConnection { tracker.AddConnection(conn) return &trackedConnection{ WriteCloser: conn, tracker: tracker, } } type trackedConnection struct { tracker *connectionTracker tcp.WriteCloser } func (t *trackedConnection) Close() error { t.tracker.RemoveConnection(t.WriteCloser) return t.WriteCloser.Close() }