traefik/pkg/middlewares/metrics/metrics.go
2024-01-08 09:10:06 +01:00

271 lines
9.4 KiB
Go

package metrics
import (
"context"
"net/http"
"strconv"
"strings"
"time"
"unicode/utf8"
"github.com/containous/alice"
gokitmetrics "github.com/go-kit/kit/metrics"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/metrics"
"github.com/traefik/traefik/v3/pkg/middlewares"
"github.com/traefik/traefik/v3/pkg/middlewares/capture"
"github.com/traefik/traefik/v3/pkg/middlewares/retry"
traefiktls "github.com/traefik/traefik/v3/pkg/tls"
"github.com/traefik/traefik/v3/pkg/tracing"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"
)
const (
protoHTTP = "http"
protoGRPC = "grpc"
protoGRPCWeb = "grpc-web"
protoSSE = "sse"
protoWebsocket = "websocket"
typeName = "Metrics"
nameEntrypoint = "metrics-entrypoint"
nameRouter = "metrics-router"
nameService = "metrics-service"
)
type metricsMiddleware struct {
next http.Handler
reqsCounter metrics.CounterWithHeaders
reqsTLSCounter gokitmetrics.Counter
reqDurationHistogram metrics.ScalableHistogram
reqsBytesCounter gokitmetrics.Counter
respsBytesCounter gokitmetrics.Counter
baseLabels []string
name string
}
// NewEntryPointMiddleware creates a new metrics middleware for an Entrypoint.
func NewEntryPointMiddleware(ctx context.Context, next http.Handler, registry metrics.Registry, entryPointName string) http.Handler {
middlewares.GetLogger(ctx, nameEntrypoint, typeName).Debug().Msg("Creating middleware")
return &metricsMiddleware{
next: next,
reqsCounter: registry.EntryPointReqsCounter(),
reqsTLSCounter: registry.EntryPointReqsTLSCounter(),
reqDurationHistogram: registry.EntryPointReqDurationHistogram(),
reqsBytesCounter: registry.EntryPointReqsBytesCounter(),
respsBytesCounter: registry.EntryPointRespsBytesCounter(),
baseLabels: []string{"entrypoint", entryPointName},
name: nameEntrypoint,
}
}
// NewRouterMiddleware creates a new metrics middleware for a Router.
func NewRouterMiddleware(ctx context.Context, next http.Handler, registry metrics.Registry, routerName string, serviceName string) http.Handler {
middlewares.GetLogger(ctx, nameRouter, typeName).Debug().Msg("Creating middleware")
return &metricsMiddleware{
next: next,
reqsCounter: registry.RouterReqsCounter(),
reqsTLSCounter: registry.RouterReqsTLSCounter(),
reqDurationHistogram: registry.RouterReqDurationHistogram(),
reqsBytesCounter: registry.RouterReqsBytesCounter(),
respsBytesCounter: registry.RouterRespsBytesCounter(),
baseLabels: []string{"router", routerName, "service", serviceName},
name: nameRouter,
}
}
// NewServiceMiddleware creates a new metrics middleware for a Service.
func NewServiceMiddleware(ctx context.Context, next http.Handler, registry metrics.Registry, serviceName string) http.Handler {
middlewares.GetLogger(ctx, nameService, typeName).Debug().Msg("Creating middleware")
return &metricsMiddleware{
next: next,
reqsCounter: registry.ServiceReqsCounter(),
reqsTLSCounter: registry.ServiceReqsTLSCounter(),
reqDurationHistogram: registry.ServiceReqDurationHistogram(),
reqsBytesCounter: registry.ServiceReqsBytesCounter(),
respsBytesCounter: registry.ServiceRespsBytesCounter(),
baseLabels: []string{"service", serviceName},
name: nameService,
}
}
// WrapEntryPointHandler Wraps metrics entrypoint to alice.Constructor.
func WrapEntryPointHandler(ctx context.Context, registry metrics.Registry, entryPointName string) alice.Constructor {
return func(next http.Handler) (http.Handler, error) {
return NewEntryPointMiddleware(ctx, next, registry, entryPointName), nil
}
}
// WrapRouterHandler Wraps metrics router to alice.Constructor.
func WrapRouterHandler(ctx context.Context, registry metrics.Registry, routerName string, serviceName string) alice.Constructor {
return func(next http.Handler) (http.Handler, error) {
return NewRouterMiddleware(ctx, next, registry, routerName, serviceName), nil
}
}
// WrapServiceHandler Wraps metrics service to alice.Constructor.
func WrapServiceHandler(ctx context.Context, registry metrics.Registry, serviceName string) alice.Constructor {
return func(next http.Handler) (http.Handler, error) {
return NewServiceMiddleware(ctx, next, registry, serviceName), nil
}
}
func (m *metricsMiddleware) GetTracingInformation() (string, string, trace.SpanKind) {
return m.name, typeName, trace.SpanKindInternal
}
func (m *metricsMiddleware) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
proto := getRequestProtocol(req)
var labels []string
labels = append(labels, m.baseLabels...)
labels = append(labels, "method", getMethod(req))
labels = append(labels, "protocol", proto)
// TLS metrics
if req.TLS != nil {
var tlsLabels []string
tlsLabels = append(tlsLabels, m.baseLabels...)
tlsLabels = append(tlsLabels, "tls_version", traefiktls.GetVersion(req.TLS), "tls_cipher", traefiktls.GetCipherName(req.TLS))
m.reqsTLSCounter.With(tlsLabels...).Add(1)
}
ctx := req.Context()
capt, err := capture.FromContext(ctx)
if err != nil {
with := log.Ctx(ctx).With()
for i := 0; i < len(m.baseLabels); i += 2 {
with = with.Str(m.baseLabels[i], m.baseLabels[i+1])
}
logger := with.Logger()
logger.Error().Err(err).Msg("Could not get Capture")
tracing.SetStatusErrorf(req.Context(), "Could not get Capture")
return
}
next := m.next
if capt.NeedsReset(rw) {
next = capt.Reset(m.next)
}
start := time.Now()
next.ServeHTTP(rw, req)
code := capt.StatusCode()
if proto == protoGRPC || proto == protoGRPCWeb {
code = grpcStatusCode(rw)
}
labels = append(labels, "code", strconv.Itoa(code))
m.reqDurationHistogram.With(labels...).ObserveFromStart(start)
m.reqsCounter.With(req.Header, labels...).Add(1)
m.respsBytesCounter.With(labels...).Add(float64(capt.ResponseSize()))
m.reqsBytesCounter.With(labels...).Add(float64(capt.RequestSize()))
}
func getRequestProtocol(req *http.Request) string {
switch {
case isWebsocketRequest(req):
return protoWebsocket
case isSSERequest(req):
return protoSSE
case isGRPCWebRequest(req):
return protoGRPCWeb
case isGRPCRequest(req):
return protoGRPC
default:
return protoHTTP
}
}
// isWebsocketRequest determines if the specified HTTP request is a websocket handshake request.
func isWebsocketRequest(req *http.Request) bool {
return containsHeader(req, "Connection", "upgrade") && containsHeader(req, "Upgrade", "websocket")
}
// isSSERequest determines if the specified HTTP request is a request for an event subscription.
func isSSERequest(req *http.Request) bool {
return containsHeader(req, "Accept", "text/event-stream")
}
// isGRPCWebRequest determines if the specified HTTP request is a gRPC-Web request.
func isGRPCWebRequest(req *http.Request) bool {
return strings.HasPrefix(req.Header.Get("Content-Type"), "application/grpc-web")
}
// isGRPCRequest determines if the specified HTTP request is a gRPC request.
func isGRPCRequest(req *http.Request) bool {
return strings.HasPrefix(req.Header.Get("Content-Type"), "application/grpc")
}
// grpcStatusCode parses and returns the gRPC status code from the Grpc-Status header.
func grpcStatusCode(rw http.ResponseWriter) int {
code := codes.Unknown
if status := rw.Header().Get("Grpc-Status"); status != "" {
if err := code.UnmarshalJSON([]byte(status)); err != nil {
return int(code)
}
}
return int(code)
}
func containsHeader(req *http.Request, name, value string) bool {
items := strings.Split(req.Header.Get(name), ",")
for _, item := range items {
if value == strings.ToLower(strings.TrimSpace(item)) {
return true
}
}
return false
}
// getMethod returns the request's method.
// It checks whether the method is a valid UTF-8 string.
// To restrict the (potentially infinite) number of accepted values for the method,
// and avoid unbounded memory issues,
// values that are not part of the set of HTTP verbs are replaced with EXTENSION_METHOD.
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods
// https://datatracker.ietf.org/doc/html/rfc2616/#section-5.1.1.
//
//nolint:usestdlibvars
func getMethod(r *http.Request) string {
if !utf8.ValidString(r.Method) {
log.Warn().Msgf("Invalid HTTP method encoding: %s", r.Method)
return "NON_UTF8_HTTP_METHOD"
}
switch r.Method {
case "HEAD", "GET", "POST", "PUT", "DELETE", "CONNECT", "OPTIONS", "TRACE", // https://datatracker.ietf.org/doc/html/rfc7231#section-4
"PATCH", // https://datatracker.ietf.org/doc/html/rfc5789#section-2
"PRI": // https://datatracker.ietf.org/doc/html/rfc7540#section-11.6
return r.Method
default:
return "EXTENSION_METHOD"
}
}
type retryMetrics interface {
ServiceRetriesCounter() gokitmetrics.Counter
}
// NewRetryListener instantiates a MetricsRetryListener with the given retryMetrics.
func NewRetryListener(retryMetrics retryMetrics, serviceName string) retry.Listener {
return &RetryListener{retryMetrics: retryMetrics, serviceName: serviceName}
}
// RetryListener is an implementation of the RetryListener interface to
// record RequestMetrics about retry attempts.
type RetryListener struct {
retryMetrics retryMetrics
serviceName string
}
// Retried tracks the retry in the RequestMetrics implementation.
func (m *RetryListener) Retried(_ *http.Request, _ int) {
m.retryMetrics.ServiceRetriesCounter().With("service", m.serviceName).Add(1)
}