2020-02-11 01:26:04 +01:00
|
|
|
package server
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"net"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
2020-09-16 15:46:04 +02:00
|
|
|
"github.com/traefik/traefik/v2/pkg/config/static"
|
|
|
|
"github.com/traefik/traefik/v2/pkg/log"
|
|
|
|
"github.com/traefik/traefik/v2/pkg/udp"
|
2020-02-11 01:26:04 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
// UDPEntryPoints maps UDP entry points by their names.
|
|
|
|
type UDPEntryPoints map[string]*UDPEntryPoint
|
|
|
|
|
|
|
|
// NewUDPEntryPoints returns all the UDP entry points, keyed by name.
|
|
|
|
func NewUDPEntryPoints(cfg static.EntryPoints) (UDPEntryPoints, error) {
|
|
|
|
entryPoints := make(UDPEntryPoints)
|
|
|
|
for entryPointName, entryPoint := range cfg {
|
|
|
|
protocol, err := entryPoint.GetProtocol()
|
|
|
|
if err != nil {
|
2020-05-11 12:06:07 +02:00
|
|
|
return nil, fmt.Errorf("error while building entryPoint %s: %w", entryPointName, err)
|
2020-02-11 01:26:04 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
if protocol != "udp" {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
ep, err := NewUDPEntryPoint(entryPoint)
|
|
|
|
if err != nil {
|
2020-05-11 12:06:07 +02:00
|
|
|
return nil, fmt.Errorf("error while building entryPoint %s: %w", entryPointName, err)
|
2020-02-11 01:26:04 +01:00
|
|
|
}
|
|
|
|
entryPoints[entryPointName] = ep
|
|
|
|
}
|
|
|
|
return entryPoints, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Start commences the listening for all the entry points.
|
|
|
|
func (eps UDPEntryPoints) Start() {
|
|
|
|
for entryPointName, ep := range eps {
|
|
|
|
ctx := log.With(context.Background(), log.Str(log.EntryPointName, entryPointName))
|
|
|
|
go ep.Start(ctx)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Stop makes all the entry points stop listening, and release associated resources.
|
|
|
|
func (eps UDPEntryPoints) Stop() {
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
|
|
|
for epn, ep := range eps {
|
|
|
|
wg.Add(1)
|
|
|
|
|
|
|
|
go func(entryPointName string, entryPoint *UDPEntryPoint) {
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
ctx := log.With(context.Background(), log.Str(log.EntryPointName, entryPointName))
|
|
|
|
entryPoint.Shutdown(ctx)
|
|
|
|
|
|
|
|
log.FromContext(ctx).Debugf("Entry point %s closed", entryPointName)
|
|
|
|
}(epn, ep)
|
|
|
|
}
|
|
|
|
|
|
|
|
wg.Wait()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Switch swaps out all the given handlers in their associated entrypoints.
|
|
|
|
func (eps UDPEntryPoints) Switch(handlers map[string]udp.Handler) {
|
|
|
|
for epName, handler := range handlers {
|
|
|
|
if ep, ok := eps[epName]; ok {
|
|
|
|
ep.Switch(handler)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
log.WithoutContext().Errorf("EntryPoint %q does not exist", epName)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// UDPEntryPoint is an entry point where we listen for UDP packets.
|
|
|
|
type UDPEntryPoint struct {
|
|
|
|
listener *udp.Listener
|
|
|
|
switcher *udp.HandlerSwitcher
|
|
|
|
transportConfiguration *static.EntryPointsTransport
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewUDPEntryPoint returns a UDP entry point.
|
|
|
|
func NewUDPEntryPoint(cfg *static.EntryPoint) (*UDPEntryPoint, error) {
|
|
|
|
addr, err := net.ResolveUDPAddr("udp", cfg.GetAddress())
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-01-07 11:16:03 -05:00
|
|
|
|
|
|
|
listener, err := udp.Listen("udp", addr, time.Duration(cfg.UDP.Timeout))
|
2020-02-11 01:26:04 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return &UDPEntryPoint{listener: listener, switcher: &udp.HandlerSwitcher{}, transportConfiguration: cfg.Transport}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Start commences the listening for ep.
|
|
|
|
func (ep *UDPEntryPoint) Start(ctx context.Context) {
|
|
|
|
log.FromContext(ctx).Debug("Start UDP Server")
|
|
|
|
for {
|
|
|
|
conn, err := ep.listener.Accept()
|
|
|
|
if err != nil {
|
|
|
|
// Only errClosedListener can happen that's why we return
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
go ep.switcher.ServeUDP(conn)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Shutdown closes ep's listener. It eventually closes all "sessions" and
|
|
|
|
// releases associated resources, but only after it has waited for a graceTimeout,
|
|
|
|
// if any was configured.
|
|
|
|
func (ep *UDPEntryPoint) Shutdown(ctx context.Context) {
|
|
|
|
logger := log.FromContext(ctx)
|
|
|
|
|
|
|
|
reqAcceptGraceTimeOut := time.Duration(ep.transportConfiguration.LifeCycle.RequestAcceptGraceTimeout)
|
|
|
|
if reqAcceptGraceTimeOut > 0 {
|
|
|
|
logger.Infof("Waiting %s for incoming requests to cease", reqAcceptGraceTimeOut)
|
|
|
|
time.Sleep(reqAcceptGraceTimeOut)
|
|
|
|
}
|
|
|
|
|
|
|
|
graceTimeOut := time.Duration(ep.transportConfiguration.LifeCycle.GraceTimeOut)
|
|
|
|
if err := ep.listener.Shutdown(graceTimeOut); err != nil {
|
|
|
|
logger.Error(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Switch replaces ep's handler with the one given as argument.
|
|
|
|
func (ep *UDPEntryPoint) Switch(handler udp.Handler) {
|
|
|
|
ep.switcher.Switch(handler)
|
|
|
|
}
|