From 0d5d14d41a4cfc45430838a3a045c08fab97f655 Mon Sep 17 00:00:00 2001 From: Romain Date: Mon, 10 Aug 2020 15:26:04 +0200 Subject: [PATCH] Pilot metrics provider Co-authored-by: Kevin Pollet --- cmd/traefik/traefik.go | 31 ++-- pkg/metrics/pilot.go | 317 ++++++++++++++++++++++++++++++++++ pkg/metrics/pilot_test.go | 355 ++++++++++++++++++++++++++++++++++++++ pkg/pilot/pilot.go | 31 ++-- pkg/pilot/pilot_test.go | 5 +- 5 files changed, 712 insertions(+), 27 deletions(-) create mode 100644 pkg/metrics/pilot.go create mode 100644 pkg/metrics/pilot_test.go diff --git a/cmd/traefik/traefik.go b/cmd/traefik/traefik.go index cb34c7fc5..e14c938e6 100644 --- a/cmd/traefik/traefik.go +++ b/cmd/traefik/traefik.go @@ -195,7 +195,21 @@ func setupServer(staticConfiguration *static.Configuration) (*server.Server, err ctx := context.Background() routinesPool := safe.NewPool(ctx) - metricsRegistry := registerMetricClients(staticConfiguration.Metrics) + metricRegistries := registerMetricClients(staticConfiguration.Metrics) + + var aviator *pilot.Pilot + if isPilotEnabled(staticConfiguration) { + pilotRegistry := metrics.RegisterPilot() + + aviator = pilot.New(staticConfiguration.Experimental.Pilot.Token, pilotRegistry, routinesPool) + routinesPool.GoCtx(func(ctx context.Context) { + aviator.Tick(ctx) + }) + + metricRegistries = append(metricRegistries, pilotRegistry) + } + + metricsRegistry := metrics.NewMultiRegistry(metricRegistries) accessLog := setupAccessLog(staticConfiguration.AccessLog) chainBuilder := middleware.NewChainBuilder(*staticConfiguration, metricsRegistry, accessLog) managerFactory := service.NewManagerFactory(*staticConfiguration, routinesPool, metricsRegistry) @@ -244,15 +258,6 @@ func setupServer(staticConfiguration *static.Configuration) (*server.Server, err metricsRegistry.LastConfigReloadSuccessGauge().Set(float64(time.Now().Unix())) }) - var aviator *pilot.Pilot - if staticConfiguration.Experimental != nil && staticConfiguration.Experimental.Pilot != nil && - staticConfiguration.Experimental.Pilot.Token != "" { - aviator = pilot.New(staticConfiguration.Experimental.Pilot.Token, routinesPool) - routinesPool.GoCtx(func(ctx context.Context) { - aviator.Tick(ctx) - }) - } - watcher.AddListener(switchRouter(routerFactory, acmeProviders, serverEntryPointsTCP, serverEntryPointsUDP, aviator)) watcher.AddListener(func(conf dynamic.Configuration) { @@ -349,9 +354,9 @@ func initACMEProvider(c *static.Configuration, providerAggregator *aggregator.Pr return resolvers } -func registerMetricClients(metricsConfig *types.Metrics) metrics.Registry { +func registerMetricClients(metricsConfig *types.Metrics) []metrics.Registry { if metricsConfig == nil { - return metrics.NewVoidRegistry() + return nil } var registries []metrics.Registry @@ -386,7 +391,7 @@ func registerMetricClients(metricsConfig *types.Metrics) metrics.Registry { metricsConfig.InfluxDB.Address, metricsConfig.InfluxDB.PushInterval) } - return metrics.NewMultiRegistry(registries) + return registries } func setupAccessLog(conf *types.AccessLog) *accesslog.Handler { diff --git a/pkg/metrics/pilot.go b/pkg/metrics/pilot.go new file mode 100644 index 000000000..8bd192a5a --- /dev/null +++ b/pkg/metrics/pilot.go @@ -0,0 +1,317 @@ +package metrics + +import ( + "strings" + "sync" + "time" + + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/generic" +) + +const ( + // server meta information + pilotConfigPrefix = "config" + pilotConfigReloadsTotalName = pilotConfigPrefix + "ReloadsTotal" + pilotConfigReloadsFailuresTotalName = pilotConfigPrefix + "ReloadsFailureTotal" + pilotConfigLastReloadSuccessName = pilotConfigPrefix + "LastReloadSuccess" + pilotConfigLastReloadFailureName = pilotConfigPrefix + "LastReloadFailure" + + // entry point + pilotEntryPointPrefix = "entrypoint" + pilotEntryPointReqsTotalName = pilotEntryPointPrefix + "RequestsTotal" + pilotEntryPointReqsTLSTotalName = pilotEntryPointPrefix + "RequestsTLSTotal" + pilotEntryPointReqDurationName = pilotEntryPointPrefix + "RequestDurationSeconds" + pilotEntryPointOpenConnsName = pilotEntryPointPrefix + "OpenConnections" + + // service level + pilotServicePrefix = "service" + pilotServiceReqsTotalName = pilotServicePrefix + "RequestsTotal" + pilotServiceReqsTLSTotalName = pilotServicePrefix + "RequestsTLSTotal" + pilotServiceReqDurationName = pilotServicePrefix + "RequestDurationSeconds" + pilotServiceOpenConnsName = pilotServicePrefix + "OpenConnections" + pilotServiceRetriesTotalName = pilotServicePrefix + "RetriesTotal" + pilotServiceServerUpName = pilotServicePrefix + "ServerUp" +) + +const root = "value" + +// RegisterPilot registers all Pilot metrics. +func RegisterPilot() *PilotRegistry { + standardRegistry := &standardRegistry{ + epEnabled: true, + svcEnabled: true, + } + + pr := &PilotRegistry{ + standardRegistry: standardRegistry, + counters: make(map[string]*pilotCounter), + gauges: make(map[string]*pilotGauge), + histograms: make(map[string]*pilotHistogram), + } + + standardRegistry.configReloadsCounter = pr.newCounter(pilotConfigReloadsTotalName) + standardRegistry.configReloadsFailureCounter = pr.newCounter(pilotConfigReloadsFailuresTotalName) + standardRegistry.lastConfigReloadSuccessGauge = pr.newGauge(pilotConfigLastReloadSuccessName) + standardRegistry.lastConfigReloadFailureGauge = pr.newGauge(pilotConfigLastReloadFailureName) + + standardRegistry.entryPointReqsCounter = pr.newCounter(pilotEntryPointReqsTotalName) + standardRegistry.entryPointReqsTLSCounter = pr.newCounter(pilotEntryPointReqsTLSTotalName) + standardRegistry.entryPointReqDurationHistogram, _ = NewHistogramWithScale(pr.newHistogram(pilotEntryPointReqDurationName), time.Second) + standardRegistry.entryPointOpenConnsGauge = pr.newGauge(pilotEntryPointOpenConnsName) + + standardRegistry.serviceReqsCounter = pr.newCounter(pilotServiceReqsTotalName) + standardRegistry.serviceReqsTLSCounter = pr.newCounter(pilotServiceReqsTLSTotalName) + standardRegistry.serviceReqDurationHistogram, _ = NewHistogramWithScale(pr.newHistogram(pilotServiceReqDurationName), time.Second) + standardRegistry.serviceOpenConnsGauge = pr.newGauge(pilotServiceOpenConnsName) + standardRegistry.serviceRetriesCounter = pr.newCounter(pilotServiceRetriesTotalName) + standardRegistry.serviceServerUpGauge = pr.newGauge(pilotServiceServerUpName) + + return pr +} + +// PilotMetric is a representation of a metric. +type PilotMetric struct { + Name string `json:"name"` + Type string `json:"type"` + Observations map[string]interface{} `json:"observations"` +} + +type pilotHistogramObservation struct { + Total float64 `json:"total"` + Count float64 `json:"count"` +} + +// PilotRegistry represents the pilots metrics registry. +type PilotRegistry struct { + counters map[string]*pilotCounter + gauges map[string]*pilotGauge + histograms map[string]*pilotHistogram + + *standardRegistry +} + +// newCounter register and returns a new pilotCounter. +func (pr *PilotRegistry) newCounter(name string) *pilotCounter { + c := newPilotCounter(name) + pr.counters[name] = c + + return c +} + +// newGauge register and returns a new pilotGauge. +func (pr *PilotRegistry) newGauge(name string) *pilotGauge { + g := newPilotGauge(name) + pr.gauges[name] = g + + return g +} + +// newHistogram register and returns a new pilotHistogram. +func (pr *PilotRegistry) newHistogram(name string) *pilotHistogram { + h := newPilotHistogram(name) + pr.histograms[name] = h + + return h +} + +// Data exports the metrics: metrics name -> labels -> values. +func (pr *PilotRegistry) Data() []PilotMetric { + var pilotMetrics []PilotMetric + + for name, counter := range pr.counters { + pilotMetric := PilotMetric{ + Name: name, + Type: "COUNTER", + Observations: make(map[string]interface{}), + } + pilotMetrics = append(pilotMetrics, pilotMetric) + + counter.counters.Range(func(key, value interface{}) bool { + labels := key.(string) + pc := value.(*pilotCounter) + + if labels == "" { + labels = root + } + + if labels == root || len(pc.c.LabelValues())%2 == 0 { + pilotMetric.Observations[labels] = pc.c.Value() + } + + return true + }) + } + + for name, gauge := range pr.gauges { + pilotMetric := PilotMetric{ + Name: name, + Type: "GAUGE", + Observations: make(map[string]interface{}), + } + pilotMetrics = append(pilotMetrics, pilotMetric) + + gauge.gauges.Range(func(key, value interface{}) bool { + labels := key.(string) + pg := value.(*pilotGauge) + + if labels == "" { + labels = root + } + + if labels == root || len(pg.g.LabelValues())%2 == 0 { + pilotMetric.Observations[labels] = pg.g.Value() + } + + return true + }) + } + + for name, histogram := range pr.histograms { + pilotMetric := PilotMetric{ + Name: name, + Type: "HISTOGRAM", + Observations: make(map[string]interface{}), + } + pilotMetrics = append(pilotMetrics, pilotMetric) + + histogram.histograms.Range(func(key, value interface{}) bool { + labels := key.(string) + ph := value.(*pilotHistogram) + + if labels == "" { + labels = root + } + + if labels == root || len(ph.labels)%2 == 0 { + pilotMetric.Observations[labels] = &pilotHistogramObservation{ + Total: ph.total.Value(), + Count: ph.count.Value(), + } + } + + return true + }) + } + + return pilotMetrics +} + +type pilotCounter struct { + c *generic.Counter + counters *sync.Map +} + +func newPilotCounter(name string) *pilotCounter { + return &pilotCounter{ + c: generic.NewCounter(name), + counters: &sync.Map{}, + } +} + +// With returns a new pilotCounter with the given labels. +func (c *pilotCounter) With(labels ...string) metrics.Counter { + newCounter := c.c.With(labels...).(*generic.Counter) + newCounter.ValueReset() + + return &pilotCounter{ + c: newCounter, + counters: c.counters, + } +} + +// Add adds the given delta to the counter. +func (c *pilotCounter) Add(delta float64) { + labelsKey := strings.Join(c.c.LabelValues(), ",") + + pc, _ := c.counters.LoadOrStore(labelsKey, c) + + pc.(*pilotCounter).c.Add(delta) +} + +type pilotGauge struct { + g *generic.Gauge + gauges *sync.Map +} + +func newPilotGauge(name string) *pilotGauge { + return &pilotGauge{ + g: generic.NewGauge(name), + gauges: &sync.Map{}, + } +} + +// With returns a new pilotGauge with the given labels. +func (g *pilotGauge) With(labels ...string) metrics.Gauge { + newGauge := g.g.With(labels...).(*generic.Gauge) + newGauge.Set(0) + + return &pilotGauge{ + g: newGauge, + gauges: g.gauges, + } +} + +// Set sets the given value to the gauge. +func (g *pilotGauge) Set(value float64) { + labelsKey := strings.Join(g.g.LabelValues(), ",") + + pg, _ := g.gauges.LoadOrStore(labelsKey, g) + + pg.(*pilotGauge).g.Set(value) +} + +// Add adds the given delta to the gauge. +func (g *pilotGauge) Add(delta float64) { + labelsKey := strings.Join(g.g.LabelValues(), ",") + + pg, _ := g.gauges.LoadOrStore(labelsKey, g) + + pg.(*pilotGauge).g.Add(delta) +} + +type pilotHistogram struct { + name string + labels []string + count *generic.Counter + total *generic.Counter + histograms *sync.Map +} + +func newPilotHistogram(name string) *pilotHistogram { + return &pilotHistogram{ + name: name, + labels: make([]string, 0), + count: &generic.Counter{}, + total: &generic.Counter{}, + histograms: &sync.Map{}, + } +} + +// With returns a new pilotHistogram with the given labels. +func (h *pilotHistogram) With(labels ...string) metrics.Histogram { + var newLabels []string + + newLabels = append(newLabels, h.labels...) + newLabels = append(newLabels, labels...) + + return &pilotHistogram{ + name: h.name, + labels: newLabels, + count: &generic.Counter{}, + total: &generic.Counter{}, + histograms: h.histograms, + } +} + +// Observe records a new value into the histogram. +func (h *pilotHistogram) Observe(value float64) { + labelsKey := strings.Join(h.labels, ",") + + ph, _ := h.histograms.LoadOrStore(labelsKey, h) + + pHisto := ph.(*pilotHistogram) + + pHisto.count.Add(1) + pHisto.total.Add(value) +} diff --git a/pkg/metrics/pilot_test.go b/pkg/metrics/pilot_test.go new file mode 100644 index 000000000..73ffb0335 --- /dev/null +++ b/pkg/metrics/pilot_test.go @@ -0,0 +1,355 @@ +package metrics + +import ( + "net/http" + "strconv" + "strings" + "testing" + "time" + + "github.com/go-kit/kit/metrics" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPilotCounter(t *testing.T) { + rootCounter := newPilotCounter("rootCounter") + + // Checks that a counter without labels can be incremented. + rootCounter.Add(1) + assertPilotCounterValue(t, 1.0, "", rootCounter) + + // Checks that a counter with labels can be incremented. + counterWithLabels := rootCounter.With("foo", "bar", "foo", "buz") + + counterWithLabels.Add(1) + assertPilotCounterValue(t, 1.0, "foo,bar,foo,buz", counterWithLabels) + + // Checks that the derived counter value has not changed. + assertPilotCounterValue(t, 1.0, "", rootCounter) + + // Checks that an existing counter (with the same labels) can be incremented. + existingCounterWithLabels := rootCounter.With("foo", "bar").With("foo", "buz") + + existingCounterWithLabels.Add(1) + assertPilotCounterValue(t, 2.0, "foo,bar,foo,buz", existingCounterWithLabels) +} + +func assertPilotCounterValue(t *testing.T, expValue float64, labels string, c metrics.Counter) { + t.Helper() + counter, ok := c.(*pilotCounter).counters.Load(labels) + + require.True(t, ok) + assert.Equal(t, expValue, counter.(*pilotCounter).c.Value()) +} + +func TestPilotGauge(t *testing.T) { + rootGauge := newPilotGauge("rootGauge") + + // Checks that a gauge without labels can be incremented. + rootGauge.Add(1) + + assertPilotGaugeValue(t, 1.0, "", rootGauge) + + // Checks that a gauge (without labels) value can be set. + rootGauge.Set(5.0) + + assertPilotGaugeValue(t, 5.0, "", rootGauge) + + // Checks that a gauge with labels can be incremented. + gaugeWithLabels := rootGauge.With("foo", "bar", "foo", "buz") + gaugeWithLabels.Add(1) + + assertPilotGaugeValue(t, 1.0, "foo,bar,foo,buz", gaugeWithLabels) + + // Checks that the derived gauge value has not changed. + assertPilotGaugeValue(t, 5.0, "", rootGauge) + + // Checks that an existing gauge (with the same labels) can be incremented. + existingGaugeWithLabels := rootGauge.With("foo", "bar").With("foo", "buz") + existingGaugeWithLabels.Add(1) + + assertPilotGaugeValue(t, 2.0, "foo,bar,foo,buz", existingGaugeWithLabels) +} + +func assertPilotGaugeValue(t *testing.T, expValue float64, labels string, g metrics.Gauge) { + t.Helper() + gauge, ok := g.(*pilotGauge).gauges.Load(labels) + + require.True(t, ok) + assert.Equal(t, expValue, gauge.(*pilotGauge).g.Value()) +} + +func TestPilotHistogram(t *testing.T) { + rootHistogram := newPilotHistogram("rootHistogram") + + // Checks that an histogram without labels can be updated. + rootHistogram.Observe(1) + + assertPilotHistogramValues(t, 1.0, 1.0, "", rootHistogram) + + rootHistogram.Observe(2) + + assertPilotHistogramValues(t, 2.0, 3.0, "", rootHistogram) + + // Checks that an histogram with labels can be updated. + histogramWithLabels := rootHistogram.With("foo", "bar", "foo", "buz") + histogramWithLabels.Observe(1) + + assertPilotHistogramValues(t, 1.0, 1.0, "foo,bar,foo,buz", histogramWithLabels) + + // Checks that the derived histogram has not changed. + assertPilotHistogramValues(t, 2.0, 3.0, "", rootHistogram) + + // Checks that an existing histogram (with the same labels) can be updated. + existingHistogramWithLabels := rootHistogram.With("foo", "bar").With("foo", "buz") + existingHistogramWithLabels.Observe(1) + + assertPilotHistogramValues(t, 2.0, 2.0, "foo,bar,foo,buz", existingHistogramWithLabels) +} + +func assertPilotHistogramValues(t *testing.T, expCount, expTotal float64, labels string, h metrics.Histogram) { + t.Helper() + histogram, ok := h.(*pilotHistogram).histograms.Load(labels) + + require.True(t, ok) + assert.Equal(t, expCount, histogram.(*pilotHistogram).count.Value()) + assert.Equal(t, expTotal, histogram.(*pilotHistogram).total.Value()) +} + +func TestPilotMetrics(t *testing.T) { + pilotRegistry := RegisterPilot() + + if !pilotRegistry.IsEpEnabled() || !pilotRegistry.IsSvcEnabled() { + t.Errorf("PilotRegistry should return true for IsEnabled() and IsSvcEnabled()") + } + + pilotRegistry.ConfigReloadsCounter().Add(1) + pilotRegistry.ConfigReloadsFailureCounter().Add(1) + pilotRegistry.LastConfigReloadSuccessGauge().Set(float64(time.Now().Unix())) + pilotRegistry.LastConfigReloadFailureGauge().Set(float64(time.Now().Unix())) + + pilotRegistry. + EntryPointReqsCounter(). + With("code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http", "entrypoint", "http"). + Add(1) + pilotRegistry. + EntryPointReqDurationHistogram(). + With("code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http", "entrypoint", "http"). + Observe(1) + pilotRegistry. + EntryPointOpenConnsGauge(). + With("method", http.MethodGet, "protocol", "http", "entrypoint", "http"). + Set(1) + + pilotRegistry. + ServiceReqsCounter(). + With("service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http"). + Add(1) + pilotRegistry. + ServiceReqDurationHistogram(). + With("service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http"). + Observe(10000) + pilotRegistry. + ServiceOpenConnsGauge(). + With("service", "service1", "method", http.MethodGet, "protocol", "http"). + Set(1) + pilotRegistry. + ServiceRetriesCounter(). + With("service", "service1"). + Add(1) + pilotRegistry. + ServiceServerUpGauge(). + With("service", "service1", "url", "http://127.0.0.10:80"). + Set(1) + + data := pilotRegistry.Data() + + testCases := []struct { + name string + labels map[string]string + assert func(*PilotMetric) + }{ + { + name: pilotConfigReloadsTotalName, + assert: buildPilotCounterAssert(t, pilotConfigReloadsTotalName, 1), + }, + { + name: pilotConfigReloadsFailuresTotalName, + assert: buildPilotCounterAssert(t, pilotConfigReloadsFailuresTotalName, 1), + }, + { + name: pilotConfigLastReloadSuccessName, + assert: buildPilotTimestampAssert(t, pilotConfigLastReloadSuccessName), + }, + { + name: pilotConfigLastReloadFailureName, + assert: buildPilotTimestampAssert(t, pilotConfigLastReloadFailureName), + }, + { + name: pilotEntryPointReqsTotalName, + labels: map[string]string{ + "code": "200", + "method": http.MethodGet, + "protocol": "http", + "entrypoint": "http", + }, + assert: buildPilotCounterAssert(t, pilotEntryPointReqsTotalName, 1), + }, + { + name: pilotEntryPointReqDurationName, + labels: map[string]string{ + "code": "200", + "method": http.MethodGet, + "protocol": "http", + "entrypoint": "http", + }, + assert: buildPilotHistogramAssert(t, pilotEntryPointReqDurationName, 1), + }, + { + name: pilotEntryPointOpenConnsName, + labels: map[string]string{ + "method": http.MethodGet, + "protocol": "http", + "entrypoint": "http", + }, + assert: buildPilotGaugeAssert(t, pilotEntryPointOpenConnsName, 1), + }, + { + name: pilotServiceReqsTotalName, + labels: map[string]string{ + "code": "200", + "method": http.MethodGet, + "protocol": "http", + "service": "service1", + }, + assert: buildPilotCounterAssert(t, pilotServiceReqsTotalName, 1), + }, + { + name: pilotServiceReqDurationName, + labels: map[string]string{ + "code": "200", + "method": http.MethodGet, + "protocol": "http", + "service": "service1", + }, + assert: buildPilotHistogramAssert(t, pilotServiceReqDurationName, 1), + }, + { + name: pilotServiceOpenConnsName, + labels: map[string]string{ + "method": http.MethodGet, + "protocol": "http", + "service": "service1", + }, + assert: buildPilotGaugeAssert(t, pilotServiceOpenConnsName, 1), + }, + { + name: pilotServiceRetriesTotalName, + labels: map[string]string{ + "service": "service1", + }, + assert: buildPilotGreaterThanCounterAssert(t, pilotServiceRetriesTotalName, 1), + }, + { + name: pilotServiceServerUpName, + labels: map[string]string{ + "service": "service1", + "url": "http://127.0.0.10:80", + }, + assert: buildPilotGaugeAssert(t, pilotServiceServerUpName, 1), + }, + } + + for _, test := range testCases { + test := test + t.Run(test.name, func(t *testing.T) { + metric := findPilotMetric(test.name, data) + if metric == nil { + t.Errorf("metrics do not contain %q", test.name) + return + } + + for labels := range metric.Observations { + if len(labels)%2 == 0 { + splitLabels := strings.Split(labels, ",") + for i := 0; i < len(splitLabels); i += 2 { + label := splitLabels[i] + value := splitLabels[i+1] + val, ok := test.labels[label] + if !ok { + t.Errorf("%q metric contains unexpected label %q", test.name, label) + } else if val != value { + t.Errorf("label %q in metric %q has wrong value %q, expected %q", label, test.name, value, val) + } + } + } + } + test.assert(metric) + }) + } +} + +func findPilotMetric(name string, metrics []PilotMetric) *PilotMetric { + for _, metric := range metrics { + if metric.Name == name { + return &metric + } + } + return nil +} + +func buildPilotCounterAssert(t *testing.T, metricName string, expectedValue float64) func(metric *PilotMetric) { + return func(metric *PilotMetric) { + for _, value := range metric.Observations { + if cv := value.(float64); cv != expectedValue { + t.Errorf("metric %s has value %f, want %f", metricName, cv, expectedValue) + } + break + } + } +} + +func buildPilotGreaterThanCounterAssert(t *testing.T, metricName string, expectedMinValue float64) func(metric *PilotMetric) { + return func(metric *PilotMetric) { + for _, value := range metric.Observations { + if cv := value.(float64); cv < expectedMinValue { + t.Errorf("metric %s has value %f, want at least %f", metricName, cv, expectedMinValue) + } + break + } + } +} + +func buildPilotHistogramAssert(t *testing.T, metricName string, expectedSampleCount float64) func(metric *PilotMetric) { + return func(metric *PilotMetric) { + for _, value := range metric.Observations { + if pho := value.(*pilotHistogramObservation); pho.Count != expectedSampleCount { + t.Errorf("metric %s has sample count value %f, want %f", metricName, pho, expectedSampleCount) + } + break + } + } +} + +func buildPilotGaugeAssert(t *testing.T, metricName string, expectedValue float64) func(metric *PilotMetric) { + return func(metric *PilotMetric) { + for _, value := range metric.Observations { + if gv := value.(float64); gv != expectedValue { + t.Errorf("metric %s has value %f, want %f", metricName, gv, expectedValue) + } + break + } + } +} + +func buildPilotTimestampAssert(t *testing.T, metricName string) func(metric *PilotMetric) { + return func(metric *PilotMetric) { + for _, value := range metric.Observations { + if ts := time.Unix(int64(value.(float64)), 0); time.Since(ts) > time.Minute { + t.Errorf("metric %s has wrong timestamp %v", metricName, ts) + } + break + } + } +} diff --git a/pkg/pilot/pilot.go b/pkg/pilot/pilot.go index 5be1d6fe1..9f230916f 100644 --- a/pkg/pilot/pilot.go +++ b/pkg/pilot/pilot.go @@ -12,6 +12,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/containous/traefik/v2/pkg/config/runtime" "github.com/containous/traefik/v2/pkg/log" + "github.com/containous/traefik/v2/pkg/metrics" "github.com/containous/traefik/v2/pkg/safe" "github.com/containous/traefik/v2/pkg/version" ) @@ -44,10 +45,11 @@ type serviceInfoRepresentation struct { type instanceInfo struct { ID string `json:"id,omitempty"` Configuration RunTimeRepresentation `json:"configuration,omitempty"` + Metrics []metrics.PilotMetric `json:"metrics,omitempty"` } // New creates a new Pilot. -func New(token string, pool *safe.Pool) *Pilot { +func New(token string, metricsRegistry *metrics.PilotRegistry, pool *safe.Pool) *Pilot { return &Pilot{ rtConfChan: make(chan *runtime.Configuration), client: &client{ @@ -55,7 +57,8 @@ func New(token string, pool *safe.Pool) *Pilot { httpClient: &http.Client{Timeout: 5 * time.Second}, baseURL: baseURL, }, - routinesPool: pool, + routinesPool: pool, + metricsRegistry: metricsRegistry, } } @@ -64,8 +67,9 @@ type Pilot struct { routinesPool *safe.Pool client *client - rtConf *runtime.Configuration - rtConfChan chan *runtime.Configuration + rtConf *runtime.Configuration + rtConfChan chan *runtime.Configuration + metricsRegistry *metrics.PilotRegistry } // SetRuntimeConfiguration stores the runtime configuration. @@ -99,8 +103,8 @@ func (p *Pilot) getRepresentation() RunTimeRepresentation { return result } -func (p *Pilot) sendData(ctx context.Context, conf RunTimeRepresentation) { - err := p.client.SendData(ctx, conf) +func (p *Pilot) sendData(ctx context.Context, conf RunTimeRepresentation, pilotMetrics []metrics.PilotMetric) { + err := p.client.SendData(ctx, conf, pilotMetrics) if err != nil { log.WithoutContext().Error(err) } @@ -117,9 +121,10 @@ func (p *Pilot) Tick(ctx context.Context) { } conf := p.getRepresentation() + pilotMetrics := p.metricsRegistry.Data() p.routinesPool.GoCtx(func(ctxRt context.Context) { - p.sendData(ctxRt, conf) + p.sendData(ctxRt, conf, pilotMetrics) }) ticker := time.NewTicker(pilotTimer) @@ -129,9 +134,10 @@ func (p *Pilot) Tick(ctx context.Context) { log.WithoutContext().Debugf("Send to pilot: %s", tick) conf := p.getRepresentation() + pilotMetrics := p.metricsRegistry.Data() p.routinesPool.GoCtx(func(ctxRt context.Context) { - p.sendData(ctxRt, conf) + p.sendData(ctxRt, conf, pilotMetrics) }) case rtConf := <-p.rtConfChan: p.rtConf = rtConf @@ -184,13 +190,13 @@ func (c *client) createUUID() (string, error) { } // SendData sends data to Pilot. -func (c *client) SendData(ctx context.Context, rtConf RunTimeRepresentation) error { +func (c *client) SendData(ctx context.Context, rtConf RunTimeRepresentation, pilotMetrics []metrics.PilotMetric) error { exponentialBackOff := backoff.NewExponentialBackOff() exponentialBackOff.MaxElapsedTime = maxElapsedTime return backoff.RetryNotify( func() error { - return c.sendData(rtConf) + return c.sendData(rtConf, pilotMetrics) }, backoff.WithContext(exponentialBackOff, ctx), func(err error, duration time.Duration) { @@ -198,7 +204,7 @@ func (c *client) SendData(ctx context.Context, rtConf RunTimeRepresentation) err }) } -func (c *client) sendData(_ RunTimeRepresentation) error { +func (c *client) sendData(_ RunTimeRepresentation, pilotMetrics []metrics.PilotMetric) error { if len(c.uuid) == 0 { var err error c.uuid, err = c.createUUID() @@ -210,7 +216,8 @@ func (c *client) sendData(_ RunTimeRepresentation) error { } info := instanceInfo{ - ID: c.uuid, + ID: c.uuid, + Metrics: pilotMetrics, } b, err := json.Marshal(info) diff --git a/pkg/pilot/pilot_test.go b/pkg/pilot/pilot_test.go index 8ba61ec94..8c0aa8d99 100644 --- a/pkg/pilot/pilot_test.go +++ b/pkg/pilot/pilot_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/containous/traefik/v2/pkg/config/runtime" + "github.com/containous/traefik/v2/pkg/metrics" "github.com/containous/traefik/v2/pkg/safe" "github.com/stretchr/testify/require" ) @@ -43,7 +44,7 @@ func TestTick(t *testing.T) { receivedConfig <- true }) - pilot := New("token", safe.NewPool(context.Background())) + pilot := New("token", metrics.RegisterPilot(), safe.NewPool(context.Background())) pilot.client.baseURL = server.URL ctx, cancel := context.WithCancel(context.Background()) @@ -118,6 +119,6 @@ func TestClient_SendConfiguration(t *testing.T) { token: myToken, } - err := client.SendData(context.Background(), RunTimeRepresentation{}) + err := client.SendData(context.Background(), RunTimeRepresentation{}, []metrics.PilotMetric{}) require.NoError(t, err) }