Metrics: Add support for InfluxDB Database / RetentionPolicy and HTTP client

This commit is contained in:
Drew Kerrigan 2018-05-29 16:58:03 -04:00 committed by Traefiker Bot
parent a7200a292b
commit 67a0b4b4b1
6 changed files with 217 additions and 12 deletions

View file

@ -78,6 +78,7 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
},
InfluxDB: &types.InfluxDB{
Address: "localhost:8089",
Protocol: "udp",
PushInterval: "10s",
},
}
@ -262,6 +263,7 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
},
InfluxDB: &types.InfluxDB{
Address: "localhost:8089",
Protocol: "udp",
PushInterval: "10s",
},
}

View file

@ -185,6 +185,13 @@ pushinterval = "10s"
#
address = "localhost:8089"
# InfluxDB's address protocol (udp or http)
#
# Required
# Default: "udp"
#
protocol = "udp"
# InfluxDB push interval
#
# Optional
@ -192,6 +199,20 @@ address = "localhost:8089"
#
pushinterval = "10s"
# InfluxDB database used when protocol is http
#
# Optional
# Default: ""
#
database = ""
# InfluxDB retention policy used when protocol is http
#
# Optional
# Default: ""
#
retentionpolicy = ""
# ...
```

View file

@ -80,6 +80,7 @@
# ...
```
### InfluxDB
```toml
@ -96,6 +97,13 @@
#
address = "localhost:8089"
# InfluxDB's address protocol (udp or http)
#
# Required
# Default: "udp"
#
protocol = "udp"
# InfluxDB push interval
#
# Optional
@ -103,6 +111,20 @@
#
pushinterval = "10s"
# InfluxDB database used when protocol is http
#
# Optional
# Default: ""
#
database = ""
# InfluxDB retention policy used when protocol is http
#
# Optional
# Default: ""
#
retentionpolicy = ""
# ...
```

View file

@ -2,6 +2,9 @@ package metrics
import (
"bytes"
"fmt"
"net/url"
"regexp"
"time"
"github.com/containous/traefik/log"
@ -12,10 +15,7 @@ import (
influxdb "github.com/influxdata/influxdb/client/v2"
)
var influxDBClient = influx.New(map[string]string{}, influxdb.BatchPointsConfig{}, kitlog.LoggerFunc(func(keyvals ...interface{}) error {
log.Info(keyvals)
return nil
}))
var influxDBClient *influx.Influx
type influxDBWriter struct {
buf bytes.Buffer
@ -41,6 +41,9 @@ const (
// RegisterInfluxDB registers the metrics pusher if this didn't happen yet and creates a InfluxDB Registry instance.
func RegisterInfluxDB(config *types.InfluxDB) Registry {
if influxDBClient == nil {
influxDBClient = initInfluxDBClient(config)
}
if influxDBTicker == nil {
influxDBTicker = initInfluxDBTicker(config)
}
@ -62,7 +65,48 @@ func RegisterInfluxDB(config *types.InfluxDB) Registry {
}
}
// initInfluxDBTicker initializes metrics pusher and creates a influxDBClient if not created already
// initInfluxDBTicker creates a influxDBClient
func initInfluxDBClient(config *types.InfluxDB) *influx.Influx {
// TODO deprecated: move this switch into configuration.SetEffectiveConfiguration when web provider will be removed.
switch config.Protocol {
case "udp":
if len(config.Database) > 0 || len(config.RetentionPolicy) > 0 {
log.Warn("Database and RetentionPolicy are only used when protocol is http.")
config.Database = ""
config.RetentionPolicy = ""
}
case "http":
if u, err := url.Parse(config.Address); err == nil {
if u.Scheme != "http" && u.Scheme != "https" {
log.Warnf("InfluxDB address %s should specify a scheme of http or https, defaulting to http.", config.Address)
config.Address = "http://" + config.Address
}
} else {
log.Errorf("Unable to parse influxdb address: %v, defaulting to udp.", err)
config.Protocol = "udp"
config.Database = ""
config.RetentionPolicy = ""
}
default:
log.Warnf("Unsupported protocol: %s, defaulting to udp.", config.Protocol)
config.Protocol = "udp"
config.Database = ""
config.RetentionPolicy = ""
}
return influx.New(
map[string]string{},
influxdb.BatchPointsConfig{
Database: config.Database,
RetentionPolicy: config.RetentionPolicy,
},
kitlog.LoggerFunc(func(keyvals ...interface{}) error {
log.Info(keyvals)
return nil
}))
}
// initInfluxDBTicker initializes metrics pusher
func initInfluxDBTicker(config *types.InfluxDB) *time.Ticker {
pushInterval, err := time.ParseDuration(config.PushInterval)
if err != nil {
@ -88,16 +132,69 @@ func StopInfluxDB() {
influxDBTicker = nil
}
// Write creates a http or udp client and attempts to write BatchPoints.
// If a "database not found" error is encountered, a CREATE DATABASE
// query is attempted when using protocol http.
func (w *influxDBWriter) Write(bp influxdb.BatchPoints) error {
c, err := influxdb.NewUDPClient(influxdb.UDPConfig{
Addr: w.config.Address,
})
c, err := w.initWriteClient()
if err != nil {
return err
}
defer c.Close()
return c.Write(bp)
if writeErr := c.Write(bp); writeErr != nil {
log.Errorf("Error writing to influx: %s", writeErr.Error())
if handleErr := w.handleWriteError(c, writeErr); handleErr != nil {
return handleErr
}
// Retry write after successful handling of writeErr
return c.Write(bp)
}
return nil
}
func (w *influxDBWriter) initWriteClient() (c influxdb.Client, err error) {
if w.config.Protocol == "http" {
c, err = influxdb.NewHTTPClient(influxdb.HTTPConfig{
Addr: w.config.Address,
})
} else {
c, err = influxdb.NewUDPClient(influxdb.UDPConfig{
Addr: w.config.Address,
})
}
return
}
func (w *influxDBWriter) handleWriteError(c influxdb.Client, writeErr error) error {
if w.config.Protocol != "http" {
return writeErr
}
match, matchErr := regexp.MatchString("database not found", writeErr.Error())
if matchErr != nil || !match {
return writeErr
}
qStr := fmt.Sprintf("CREATE DATABASE \"%s\"", w.config.Database)
if w.config.RetentionPolicy != "" {
qStr = fmt.Sprintf("%s WITH NAME \"%s\"", qStr, w.config.RetentionPolicy)
}
log.Debugf("Influx database does not exist, attempting to create with query: %s", qStr)
q := influxdb.NewQuery(qStr, "", "")
response, queryErr := c.Query(q)
if queryErr == nil && response.Error() != nil {
queryErr = response.Error()
}
if queryErr != nil {
log.Errorf("Error creating InfluxDB database: %s", queryErr)
return queryErr
}
log.Debugf("Successfully created influx database: %s", w.config.Database)
return nil
}

View file

@ -1,7 +1,10 @@
package metrics
import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"regexp"
"strconv"
"testing"
@ -62,6 +65,63 @@ func TestInfluxDB(t *testing.T) {
assertMessage(t, msgEntrypoint, expectedEntrypoint)
}
func TestInfluxDBHTTP(t *testing.T) {
c := make(chan *string)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "can't read body "+err.Error(), http.StatusBadRequest)
return
}
bodyStr := string(body)
c <- &bodyStr
fmt.Fprintln(w, "ok")
}))
defer ts.Close()
influxDBRegistry := RegisterInfluxDB(&types.InfluxDB{Address: ts.URL, Protocol: "http", PushInterval: "1s", Database: "test", RetentionPolicy: "autogen"})
defer StopInfluxDB()
if !influxDBRegistry.IsEnabled() {
t.Fatalf("InfluxDB registry must be enabled")
}
expectedBackend := []string{
`(traefik\.backend\.requests\.total,backend=test,code=200,method=GET count=1) [\d]{19}`,
`(traefik\.backend\.requests\.total,backend=test,code=404,method=GET count=1) [\d]{19}`,
`(traefik\.backend\.request\.duration,backend=test,code=200 p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`,
`(traefik\.backend\.retries\.total(?:,code=[\d]{3},method=GET)?,backend=test count=2) [\d]{19}`,
`(traefik\.config\.reload\.total(?:[a-z=0-9A-Z,]+)? count=1) [\d]{19}`,
`(traefik\.config\.reload\.total\.failure(?:[a-z=0-9A-Z,]+)? count=1) [\d]{19}`,
`(traefik\.backend\.server\.up,backend=test(?:[a-z=0-9A-Z,]+)?,url=http://127.0.0.1 value=1) [\d]{19}`,
}
influxDBRegistry.BackendReqsCounter().With("backend", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
influxDBRegistry.BackendReqsCounter().With("backend", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
influxDBRegistry.BackendRetriesCounter().With("backend", "test").Add(1)
influxDBRegistry.BackendRetriesCounter().With("backend", "test").Add(1)
influxDBRegistry.BackendReqDurationHistogram().With("backend", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000)
influxDBRegistry.ConfigReloadsCounter().Add(1)
influxDBRegistry.ConfigReloadsFailureCounter().Add(1)
influxDBRegistry.BackendServerUpGauge().With("backend", "test", "url", "http://127.0.0.1").Set(1)
msgBackend := <-c
assertMessage(t, *msgBackend, expectedBackend)
expectedEntrypoint := []string{
`(traefik\.entrypoint\.requests\.total,entrypoint=test(?:[a-z=0-9A-Z,:/.]+)? count=1) [\d]{19}`,
`(traefik\.entrypoint\.request\.duration(?:,code=[\d]{3})?,entrypoint=test(?:[a-z=0-9A-Z,:/.]+)? p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`,
`(traefik\.entrypoint\.connections\.open,entrypoint=test value=1) [\d]{19}`,
}
influxDBRegistry.EntrypointReqsCounter().With("entrypoint", "test").Add(1)
influxDBRegistry.EntrypointReqDurationHistogram().With("entrypoint", "test").Observe(10000)
influxDBRegistry.EntrypointOpenConnsGauge().With("entrypoint", "test").Set(1)
msgEntrypoint := <-c
assertMessage(t, *msgEntrypoint, expectedEntrypoint)
}
func assertMessage(t *testing.T, msg string, patterns []string) {
t.Helper()
for _, pattern := range patterns {

View file

@ -443,8 +443,11 @@ type Statsd struct {
// InfluxDB contains address and metrics pushing interval configuration
type InfluxDB struct {
Address string `description:"InfluxDB address"`
PushInterval string `description:"InfluxDB push interval"`
Address string `description:"InfluxDB address"`
Protocol string `description:"InfluxDB address protocol (udp or http)"`
PushInterval string `description:"InfluxDB push interval" export:"true"`
Database string `description:"InfluxDB database used when protocol is http" export:"true"`
RetentionPolicy string `description:"InfluxDB retention policy used when protocol is http" export:"true"`
}
// Buckets holds Prometheus Buckets