traefik/pkg/server/service/service.go

396 lines
13 KiB
Go
Raw Normal View History

2018-11-14 09:18:03 +00:00
package service
import (
"context"
2023-11-29 11:20:57 +00:00
"encoding/hex"
"errors"
2018-11-14 09:18:03 +00:00
"fmt"
"hash/fnv"
"math/rand"
2018-11-14 09:18:03 +00:00
"net/http"
"net/http/httputil"
"net/url"
2019-08-26 17:00:04 +00:00
"reflect"
"strings"
2018-11-14 09:18:03 +00:00
"time"
2024-01-08 08:10:06 +00:00
"github.com/containous/alice"
2022-11-21 17:36:05 +00:00
"github.com/rs/zerolog/log"
2023-02-03 14:24:05 +00:00
"github.com/traefik/traefik/v3/pkg/config/dynamic"
"github.com/traefik/traefik/v3/pkg/config/runtime"
"github.com/traefik/traefik/v3/pkg/healthcheck"
"github.com/traefik/traefik/v3/pkg/logs"
"github.com/traefik/traefik/v3/pkg/middlewares/accesslog"
"github.com/traefik/traefik/v3/pkg/middlewares/capture"
2023-02-03 14:24:05 +00:00
metricsMiddle "github.com/traefik/traefik/v3/pkg/middlewares/metrics"
2024-03-12 08:48:04 +00:00
"github.com/traefik/traefik/v3/pkg/middlewares/observability"
2023-02-03 14:24:05 +00:00
"github.com/traefik/traefik/v3/pkg/safe"
"github.com/traefik/traefik/v3/pkg/server/cookie"
"github.com/traefik/traefik/v3/pkg/server/middleware"
2023-02-03 14:24:05 +00:00
"github.com/traefik/traefik/v3/pkg/server/provider"
"github.com/traefik/traefik/v3/pkg/server/service/loadbalancer/failover"
"github.com/traefik/traefik/v3/pkg/server/service/loadbalancer/mirror"
"github.com/traefik/traefik/v3/pkg/server/service/loadbalancer/wrr"
2018-11-14 09:18:03 +00:00
)
const defaultMaxBodySize int64 = -1
2020-09-11 13:40:03 +00:00
// RoundTripperGetter is a roundtripper getter interface.
type RoundTripperGetter interface {
Get(name string) (http.RoundTripper, error)
}
// Manager The service manager.
type Manager struct {
routinePool *safe.Pool
observabilityMgr *middleware.ObservabilityMgr
bufferPool httputil.BufferPool
roundTripperManager RoundTripperGetter
services map[string]http.Handler
configs map[string]*runtime.ServiceInfo
healthCheckers map[string]*healthcheck.ServiceHealthChecker
rand *rand.Rand // For the initial shuffling of load-balancers.
}
2020-05-11 10:06:07 +00:00
// NewManager creates a new Manager.
func NewManager(configs map[string]*runtime.ServiceInfo, observabilityMgr *middleware.ObservabilityMgr, routinePool *safe.Pool, roundTripperManager RoundTripperGetter) *Manager {
2018-11-14 09:18:03 +00:00
return &Manager{
2019-08-26 17:00:04 +00:00
routinePool: routinePool,
observabilityMgr: observabilityMgr,
2018-11-14 09:18:03 +00:00
bufferPool: newBufferPool(),
2020-09-11 13:40:03 +00:00
roundTripperManager: roundTripperManager,
services: make(map[string]http.Handler),
2018-11-14 09:18:03 +00:00
configs: configs,
healthCheckers: make(map[string]*healthcheck.ServiceHealthChecker),
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
2018-11-14 09:18:03 +00:00
}
}
// BuildHTTP Creates a http.Handler for a service configuration.
func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string) (http.Handler, error) {
2022-11-21 17:36:05 +00:00
serviceName = provider.GetQualifiedName(rootCtx, serviceName)
2018-11-14 09:18:03 +00:00
2022-11-21 17:36:05 +00:00
ctx := log.Ctx(rootCtx).With().Str(logs.ServiceName, serviceName).Logger().
WithContext(provider.AddInContext(rootCtx, serviceName))
handler, ok := m.services[serviceName]
if ok {
return handler, nil
}
conf, ok := m.configs[serviceName]
if !ok {
return nil, fmt.Errorf("the service %q does not exist", serviceName)
}
if conf.Status == runtime.StatusDisabled {
return nil, errors.New(strings.Join(conf.Err, ", "))
}
2019-08-26 17:00:04 +00:00
value := reflect.ValueOf(*conf.Service)
var count int
2024-02-19 14:44:03 +00:00
for i := range value.NumField() {
2019-08-26 17:00:04 +00:00
if !value.Field(i).IsNil() {
count++
}
}
if count > 1 {
err := errors.New("cannot create service: multi-types service not supported, consider declaring two different pieces of service instead")
conf.AddError(err, true)
return nil, err
}
var lb http.Handler
switch {
case conf.LoadBalancer != nil:
var err error
lb, err = m.getLoadBalancerServiceHandler(ctx, serviceName, conf)
if err != nil {
conf.AddError(err, true)
return nil, err
}
case conf.Weighted != nil:
var err error
lb, err = m.getWRRServiceHandler(ctx, serviceName, conf.Weighted)
if err != nil {
conf.AddError(err, true)
return nil, err
}
2019-08-26 17:00:04 +00:00
case conf.Mirroring != nil:
var err error
lb, err = m.getMirrorServiceHandler(ctx, conf.Mirroring)
2019-08-26 17:00:04 +00:00
if err != nil {
conf.AddError(err, true)
return nil, err
}
case conf.Failover != nil:
var err error
lb, err = m.getFailoverServiceHandler(ctx, serviceName, conf.Failover)
if err != nil {
conf.AddError(err, true)
return nil, err
}
default:
sErr := fmt.Errorf("the service %q does not have any type defined", serviceName)
conf.AddError(sErr, true)
return nil, sErr
2018-11-14 09:18:03 +00:00
}
m.services[serviceName] = lb
return lb, nil
}
func (m *Manager) getFailoverServiceHandler(ctx context.Context, serviceName string, config *dynamic.Failover) (http.Handler, error) {
f := failover.New(config.HealthCheck)
serviceHandler, err := m.BuildHTTP(ctx, config.Service)
if err != nil {
return nil, err
}
f.SetHandler(serviceHandler)
updater, ok := serviceHandler.(healthcheck.StatusUpdater)
if !ok {
return nil, fmt.Errorf("child service %v of %v not a healthcheck.StatusUpdater (%T)", config.Service, serviceName, serviceHandler)
}
if err := updater.RegisterStatusUpdater(func(up bool) {
f.SetHandlerStatus(ctx, up)
}); err != nil {
return nil, fmt.Errorf("cannot register %v as updater for %v: %w", config.Service, serviceName, err)
}
fallbackHandler, err := m.BuildHTTP(ctx, config.Fallback)
if err != nil {
return nil, err
}
f.SetFallbackHandler(fallbackHandler)
// Do not report the health of the fallback handler.
if config.HealthCheck == nil {
return f, nil
}
fallbackUpdater, ok := fallbackHandler.(healthcheck.StatusUpdater)
if !ok {
return nil, fmt.Errorf("child service %v of %v not a healthcheck.StatusUpdater (%T)", config.Fallback, serviceName, fallbackHandler)
}
if err := fallbackUpdater.RegisterStatusUpdater(func(up bool) {
f.SetFallbackHandlerStatus(ctx, up)
}); err != nil {
return nil, fmt.Errorf("cannot register %v as updater for %v: %w", config.Fallback, serviceName, err)
}
return f, nil
}
func (m *Manager) getMirrorServiceHandler(ctx context.Context, config *dynamic.Mirroring) (http.Handler, error) {
serviceHandler, err := m.BuildHTTP(ctx, config.Service)
2019-08-26 17:00:04 +00:00
if err != nil {
return nil, err
}
maxBodySize := defaultMaxBodySize
if config.MaxBodySize != nil {
maxBodySize = *config.MaxBodySize
}
handler := mirror.New(serviceHandler, m.routinePool, maxBodySize, config.HealthCheck)
2019-08-26 17:00:04 +00:00
for _, mirrorConfig := range config.Mirrors {
mirrorHandler, err := m.BuildHTTP(ctx, mirrorConfig.Name)
2019-08-26 17:00:04 +00:00
if err != nil {
return nil, err
}
err = handler.AddMirror(mirrorHandler, mirrorConfig.Percent)
if err != nil {
return nil, err
}
}
return handler, nil
}
func (m *Manager) getWRRServiceHandler(ctx context.Context, serviceName string, config *dynamic.WeightedRoundRobin) (http.Handler, error) {
// TODO Handle accesslog and metrics with multiple service name
if config.Sticky != nil && config.Sticky.Cookie != nil {
config.Sticky.Cookie.Name = cookie.GetName(config.Sticky.Cookie.Name, serviceName)
}
balancer := wrr.New(config.Sticky, config.HealthCheck != nil)
for _, service := range shuffle(config.Services, m.rand) {
if service.Status != nil {
serviceHandler := http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
writer.WriteHeader(*service.Status)
})
balancer.Add(service.Name, serviceHandler, service.Weight)
continue
}
serviceHandler, err := m.BuildHTTP(ctx, service.Name)
if err != nil {
return nil, err
}
balancer.Add(service.Name, serviceHandler, service.Weight)
if config.HealthCheck == nil {
continue
}
childName := service.Name
updater, ok := serviceHandler.(healthcheck.StatusUpdater)
if !ok {
return nil, fmt.Errorf("child service %v of %v not a healthcheck.StatusUpdater (%T)", childName, serviceName, serviceHandler)
}
if err := updater.RegisterStatusUpdater(func(up bool) {
balancer.SetStatus(ctx, childName, up)
}); err != nil {
return nil, fmt.Errorf("cannot register %v as updater for %v: %w", childName, serviceName, err)
}
2022-11-21 17:36:05 +00:00
log.Ctx(ctx).Debug().Str("parent", serviceName).Str("child", childName).
Msg("Child service will update parent on status change")
}
return balancer, nil
2018-11-14 09:18:03 +00:00
}
func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName string, info *runtime.ServiceInfo) (http.Handler, error) {
service := info.LoadBalancer
2022-11-21 17:36:05 +00:00
logger := log.Ctx(ctx)
logger.Debug().Msg("Creating load-balancer")
2020-09-11 13:40:03 +00:00
// TODO: should we keep this config value as Go is now handling stream response correctly?
flushInterval := dynamic.DefaultFlushInterval
if service.ResponseForwarding != nil {
flushInterval = service.ResponseForwarding.FlushInterval
2020-09-11 13:40:03 +00:00
}
if len(service.ServersTransport) > 0 {
service.ServersTransport = provider.GetQualifiedName(ctx, service.ServersTransport)
2018-11-14 09:18:03 +00:00
}
if service.Sticky != nil && service.Sticky.Cookie != nil {
service.Sticky.Cookie.Name = cookie.GetName(service.Sticky.Cookie.Name, serviceName)
2019-07-18 19:36:05 +00:00
}
2018-11-14 09:18:03 +00:00
// We make sure that the PassHostHeader value is defined to avoid panics.
passHostHeader := dynamic.DefaultPassHostHeader
if service.PassHostHeader != nil {
passHostHeader = *service.PassHostHeader
2018-11-14 09:18:03 +00:00
}
roundTripper, err := m.roundTripperManager.Get(service.ServersTransport)
2018-11-14 09:18:03 +00:00
if err != nil {
return nil, err
}
lb := wrr.New(service.Sticky, service.HealthCheck != nil)
healthCheckTargets := make(map[string]*url.URL)
2018-11-14 09:18:03 +00:00
for _, server := range shuffle(service.Servers, m.rand) {
hasher := fnv.New64a()
_, _ = hasher.Write([]byte(server.URL)) // this will never return an error.
2018-11-14 09:18:03 +00:00
2023-11-29 11:20:57 +00:00
proxyName := hex.EncodeToString(hasher.Sum(nil))
2018-11-14 09:18:03 +00:00
target, err := url.Parse(server.URL)
if err != nil {
return nil, fmt.Errorf("error parsing server URL %s: %w", server.URL, err)
2018-11-14 09:18:03 +00:00
}
2022-11-21 17:36:05 +00:00
logger.Debug().Str(logs.ServerName, proxyName).Stringer("target", target).
Msg("Creating server")
2022-09-20 14:54:08 +00:00
2024-03-12 08:48:04 +00:00
qualifiedSvcName := provider.GetQualifiedName(ctx, serviceName)
if m.observabilityMgr.ShouldAddTracing(qualifiedSvcName) || m.observabilityMgr.ShouldAddMetrics(qualifiedSvcName) {
// Wrapping the roundTripper with the Tracing roundTripper,
// to handle the reverseProxy client span creation.
roundTripper = newObservabilityRoundTripper(m.observabilityMgr.SemConvMetricsRegistry(), roundTripper)
}
proxy := buildSingleHostProxy(target, passHostHeader, time.Duration(flushInterval), roundTripper, m.bufferPool)
2018-11-14 09:18:03 +00:00
// Prevents from enabling observability for internal resources.
2019-06-05 20:18:06 +00:00
2024-03-12 08:48:04 +00:00
if m.observabilityMgr.ShouldAddAccessLogs(qualifiedSvcName) {
proxy = accesslog.NewFieldHandler(proxy, accesslog.ServiceURL, target.String(), nil)
proxy = accesslog.NewFieldHandler(proxy, accesslog.ServiceAddr, target.Host, nil)
proxy = accesslog.NewFieldHandler(proxy, accesslog.ServiceName, serviceName, accesslog.AddServiceFields)
}
if m.observabilityMgr.MetricsRegistry() != nil && m.observabilityMgr.MetricsRegistry().IsSvcEnabled() &&
2024-03-12 08:48:04 +00:00
m.observabilityMgr.ShouldAddMetrics(qualifiedSvcName) {
metricsHandler := metricsMiddle.WrapServiceHandler(ctx, m.observabilityMgr.MetricsRegistry(), serviceName)
2024-01-08 08:10:06 +00:00
proxy, err = alice.New().
2024-03-12 08:48:04 +00:00
Append(observability.WrapMiddleware(ctx, metricsHandler)).
2024-01-08 08:10:06 +00:00
Then(proxy)
if err != nil {
return nil, fmt.Errorf("error wrapping metrics handler: %w", err)
}
}
2024-03-12 08:48:04 +00:00
if m.observabilityMgr.ShouldAddTracing(qualifiedSvcName) {
proxy = observability.NewService(ctx, serviceName, proxy)
}
2024-01-08 08:10:06 +00:00
if m.observabilityMgr.ShouldAddAccessLogs(qualifiedSvcName) || m.observabilityMgr.ShouldAddMetrics(qualifiedSvcName) {
// Some piece of middleware, like the ErrorPage, are relying on this serviceBuilder to get the handler for a given service,
// to re-target the request to it.
// Those pieces of middleware can be configured on routes that expose a Traefik internal service.
// In such a case, observability for internals being optional, the capture probe could be absent from context (no wrap via the entrypoint).
// But if the service targeted by this piece of middleware is not an internal one,
// and requires observability, we still want the capture probe to be present in the request context.
// Makes sure a capture probe is in the request context.
proxy, _ = capture.Wrap(proxy)
}
2024-01-26 00:44:05 +00:00
lb.Add(proxyName, proxy, server.Weight)
2020-03-23 10:24:05 +00:00
// servers are considered UP by default.
info.UpdateServerStatus(target.String(), runtime.StatusUp)
2018-11-14 09:18:03 +00:00
healthCheckTargets[proxyName] = target
2018-11-14 09:18:03 +00:00
}
if service.HealthCheck != nil {
m.healthCheckers[serviceName] = healthcheck.NewServiceHealthChecker(
ctx,
m.observabilityMgr.MetricsRegistry(),
service.HealthCheck,
lb,
info,
roundTripper,
healthCheckTargets,
serviceName,
)
2018-11-14 09:18:03 +00:00
}
return lb, nil
2018-11-14 09:18:03 +00:00
}
2020-03-23 10:24:05 +00:00
// LaunchHealthCheck launches the health checks.
func (m *Manager) LaunchHealthCheck(ctx context.Context) {
for serviceName, hc := range m.healthCheckers {
2022-11-21 17:36:05 +00:00
logger := log.Ctx(ctx).With().Str(logs.ServiceName, serviceName).Logger()
go hc.Launch(logger.WithContext(ctx))
2020-03-23 10:24:05 +00:00
}
}
func shuffle[T any](values []T, r *rand.Rand) []T {
shuffled := make([]T, len(values))
copy(shuffled, values)
r.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
return shuffled
}