traefik/pkg/metrics/influxdb2.go
2023-03-21 16:45:33 +01:00

174 lines
6.6 KiB
Go

package metrics
import (
"context"
"errors"
"time"
"github.com/go-kit/kit/metrics/influx"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2api "github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
influxdb2log "github.com/influxdata/influxdb-client-go/v2/log"
influxdb "github.com/influxdata/influxdb1-client/v2"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/logs"
"github.com/traefik/traefik/v3/pkg/safe"
"github.com/traefik/traefik/v3/pkg/types"
)
var (
influxDB2Ticker *time.Ticker
influxDB2Store *influx.Influx
influxDB2Client influxdb2.Client
)
const (
influxDBConfigReloadsName = "traefik.config.reload.total"
influxDBLastConfigReloadSuccessName = "traefik.config.reload.lastSuccessTimestamp"
influxDBOpenConnsName = "traefik.open.connections"
influxDBTLSCertsNotAfterTimestampName = "traefik.tls.certs.notAfterTimestamp"
influxDBEntryPointReqsName = "traefik.entrypoint.requests.total"
influxDBEntryPointReqsTLSName = "traefik.entrypoint.requests.tls.total"
influxDBEntryPointReqDurationName = "traefik.entrypoint.request.duration"
influxDBEntryPointReqsBytesName = "traefik.entrypoint.requests.bytes.total"
influxDBEntryPointRespsBytesName = "traefik.entrypoint.responses.bytes.total"
influxDBRouterReqsName = "traefik.router.requests.total"
influxDBRouterReqsTLSName = "traefik.router.requests.tls.total"
influxDBRouterReqsDurationName = "traefik.router.request.duration"
influxDBRouterReqsBytesName = "traefik.router.requests.bytes.total"
influxDBRouterRespsBytesName = "traefik.router.responses.bytes.total"
influxDBServiceReqsName = "traefik.service.requests.total"
influxDBServiceReqsTLSName = "traefik.service.requests.tls.total"
influxDBServiceReqsDurationName = "traefik.service.request.duration"
influxDBServiceRetriesTotalName = "traefik.service.retries.total"
influxDBServiceServerUpName = "traefik.service.server.up"
influxDBServiceReqsBytesName = "traefik.service.requests.bytes.total"
influxDBServiceRespsBytesName = "traefik.service.responses.bytes.total"
)
// RegisterInfluxDB2 creates metrics exporter for InfluxDB2.
func RegisterInfluxDB2(ctx context.Context, config *types.InfluxDB2) Registry {
logger := log.Ctx(ctx)
if influxDB2Client == nil {
var err error
if influxDB2Client, err = newInfluxDB2Client(config); err != nil {
logger.Error().Err(err).Send()
return nil
}
}
if influxDB2Store == nil {
influxDB2Store = influx.New(
config.AdditionalLabels,
influxdb.BatchPointsConfig{},
logs.NewGoKitWrapper(*logger),
)
influxDB2Ticker = time.NewTicker(time.Duration(config.PushInterval))
safe.Go(func() {
wc := influxDB2Client.WriteAPIBlocking(config.Org, config.Bucket)
influxDB2Store.WriteLoop(ctx, influxDB2Ticker.C, influxDB2Writer{wc: wc})
})
}
registry := &standardRegistry{
configReloadsCounter: influxDB2Store.NewCounter(influxDBConfigReloadsName),
lastConfigReloadSuccessGauge: influxDB2Store.NewGauge(influxDBLastConfigReloadSuccessName),
openConnectionsGauge: influxDB2Store.NewGauge(influxDBOpenConnsName),
tlsCertsNotAfterTimestampGauge: influxDB2Store.NewGauge(influxDBTLSCertsNotAfterTimestampName),
}
if config.AddEntryPointsLabels {
registry.epEnabled = config.AddEntryPointsLabels
registry.entryPointReqsCounter = NewCounterWithNoopHeaders(influxDB2Store.NewCounter(influxDBEntryPointReqsName))
registry.entryPointReqsTLSCounter = influxDB2Store.NewCounter(influxDBEntryPointReqsTLSName)
registry.entryPointReqDurationHistogram, _ = NewHistogramWithScale(influxDB2Store.NewHistogram(influxDBEntryPointReqDurationName), time.Second)
registry.entryPointReqsBytesCounter = influxDB2Store.NewCounter(influxDBEntryPointReqsBytesName)
registry.entryPointRespsBytesCounter = influxDB2Store.NewCounter(influxDBEntryPointRespsBytesName)
}
if config.AddRoutersLabels {
registry.routerEnabled = config.AddRoutersLabels
registry.routerReqsCounter = NewCounterWithNoopHeaders(influxDB2Store.NewCounter(influxDBRouterReqsName))
registry.routerReqsTLSCounter = influxDB2Store.NewCounter(influxDBRouterReqsTLSName)
registry.routerReqDurationHistogram, _ = NewHistogramWithScale(influxDB2Store.NewHistogram(influxDBRouterReqsDurationName), time.Second)
registry.routerReqsBytesCounter = influxDB2Store.NewCounter(influxDBRouterReqsBytesName)
registry.routerRespsBytesCounter = influxDB2Store.NewCounter(influxDBRouterRespsBytesName)
}
if config.AddServicesLabels {
registry.svcEnabled = config.AddServicesLabels
registry.serviceReqsCounter = NewCounterWithNoopHeaders(influxDB2Store.NewCounter(influxDBServiceReqsName))
registry.serviceReqsTLSCounter = influxDB2Store.NewCounter(influxDBServiceReqsTLSName)
registry.serviceReqDurationHistogram, _ = NewHistogramWithScale(influxDB2Store.NewHistogram(influxDBServiceReqsDurationName), time.Second)
registry.serviceRetriesCounter = influxDB2Store.NewCounter(influxDBServiceRetriesTotalName)
registry.serviceServerUpGauge = influxDB2Store.NewGauge(influxDBServiceServerUpName)
registry.serviceReqsBytesCounter = influxDB2Store.NewCounter(influxDBServiceReqsBytesName)
registry.serviceRespsBytesCounter = influxDB2Store.NewCounter(influxDBServiceRespsBytesName)
}
return registry
}
// StopInfluxDB2 stops and resets InfluxDB2 client, ticker and store.
func StopInfluxDB2() {
if influxDB2Client != nil {
influxDB2Client.Close()
}
influxDB2Client = nil
if influxDB2Ticker != nil {
influxDB2Ticker.Stop()
}
influxDB2Ticker = nil
influxDB2Store = nil
}
// newInfluxDB2Client creates an influxdb2.Client.
func newInfluxDB2Client(config *types.InfluxDB2) (influxdb2.Client, error) {
if config.Token == "" || config.Org == "" || config.Bucket == "" {
return nil, errors.New("token, org or bucket property is missing")
}
// Disable InfluxDB2 logs.
// See https://github.com/influxdata/influxdb-client-go/blob/v2.7.0/options.go#L128
influxdb2log.Log = nil
return influxdb2.NewClient(config.Address, config.Token), nil
}
type influxDB2Writer struct {
wc influxdb2api.WriteAPIBlocking
}
func (w influxDB2Writer) Write(bp influxdb.BatchPoints) error {
logger := log.With().Str(logs.MetricsProviderName, "influxdb2").Logger()
wps := make([]*write.Point, 0, len(bp.Points()))
for _, p := range bp.Points() {
fields, err := p.Fields()
if err != nil {
logger.Error().Err(err).Msgf("Error while getting %s point fields", p.Name())
continue
}
wps = append(wps, influxdb2.NewPoint(
p.Name(),
p.Tags(),
fields,
p.Time(),
))
}
ctx := logger.WithContext(context.Background())
return w.wc.WritePoint(ctx, wps...)
}