traefik/pkg/provider/kv/kv.go

167 lines
4.6 KiB
Go
Raw Permalink Normal View History

package kv
import (
"context"
"errors"
"fmt"
"path"
"time"
2020-02-28 23:13:44 +00:00
"github.com/cenkalti/backoff/v4"
"github.com/kvtools/valkeyrie"
"github.com/kvtools/valkeyrie/store"
2022-11-21 17:36:05 +00:00
"github.com/rs/zerolog/log"
2023-02-03 14:24:05 +00:00
"github.com/traefik/traefik/v3/pkg/config/dynamic"
"github.com/traefik/traefik/v3/pkg/config/kv"
"github.com/traefik/traefik/v3/pkg/job"
"github.com/traefik/traefik/v3/pkg/logs"
"github.com/traefik/traefik/v3/pkg/safe"
)
// Provider holds configurations of the provider.
type Provider struct {
2022-09-12 15:40:09 +00:00
RootKey string `description:"Root key used for KV store." json:"rootKey,omitempty" toml:"rootKey,omitempty" yaml:"rootKey,omitempty"`
Endpoints []string `description:"KV store endpoints." json:"endpoints,omitempty" toml:"endpoints,omitempty" yaml:"endpoints,omitempty"`
name string
kvClient store.Store
}
// SetDefaults sets the default values.
func (p *Provider) SetDefaults() {
p.RootKey = "traefik"
}
2020-05-11 10:06:07 +00:00
// Init the provider.
2022-09-12 15:40:09 +00:00
func (p *Provider) Init(storeType, name string, config valkeyrie.Config) error {
2022-11-21 17:36:05 +00:00
ctx := log.With().Str(logs.ProviderName, name).Logger().WithContext(context.Background())
p.name = name
2022-09-12 15:40:09 +00:00
kvClient, err := p.createKVClient(ctx, storeType, config)
if err != nil {
return fmt.Errorf("failed to Connect to KV store: %w", err)
}
p.kvClient = kvClient
return nil
}
// Provide allows the docker provider to provide configurations to traefik using the given configuration channel.
func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error {
2022-11-21 17:36:05 +00:00
logger := log.With().Str(logs.ProviderName, p.name).Logger()
ctx := logger.WithContext(context.Background())
operation := func() error {
2022-08-11 13:42:07 +00:00
if _, err := p.kvClient.Exists(ctx, path.Join(p.RootKey, "qmslkjdfmqlskdjfmqlksjazçueznbvbwzlkajzebvkwjdcqmlsfj"), nil); err != nil {
return fmt.Errorf("KV store connection error: %w", err)
}
return nil
}
notify := func(err error, time time.Duration) {
2022-11-21 17:36:05 +00:00
logger.Error().Err(err).Msgf("KV connection error, retrying in %s", time)
}
2022-11-21 17:36:05 +00:00
2022-08-11 13:42:07 +00:00
err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctx), notify)
if err != nil {
return fmt.Errorf("cannot connect to KV server: %w", err)
}
2022-08-11 13:42:07 +00:00
configuration, err := p.buildConfiguration(ctx)
if err != nil {
2022-11-21 17:36:05 +00:00
logger.Error().Err(err).Msg("Cannot build the configuration")
} else {
configurationChan <- dynamic.Message{
ProviderName: p.name,
Configuration: configuration,
}
}
2020-02-10 15:03:39 +00:00
pool.GoCtx(func(ctxPool context.Context) {
2022-11-21 17:36:05 +00:00
ctxLog := logger.With().Str(logs.ProviderName, p.name).Logger().WithContext(ctxPool)
2020-02-10 15:03:39 +00:00
err := p.watchKv(ctxLog, configurationChan)
if err != nil {
2022-11-30 08:50:05 +00:00
logger.Error().Err(err).Msg("Cannot retrieve data")
}
})
return nil
}
2020-02-10 15:03:39 +00:00
func (p *Provider) watchKv(ctx context.Context, configurationChan chan<- dynamic.Message) error {
operation := func() error {
2022-08-11 13:42:07 +00:00
events, err := p.kvClient.WatchTree(ctx, p.RootKey, nil)
if err != nil {
return fmt.Errorf("failed to watch KV: %w", err)
}
for {
select {
2020-02-10 15:03:39 +00:00
case <-ctx.Done():
return nil
case _, ok := <-events:
if !ok {
return errors.New("the WatchTree channel is closed")
}
2022-08-11 13:42:07 +00:00
configuration, errC := p.buildConfiguration(ctx)
if errC != nil {
return errC
}
if configuration != nil {
configurationChan <- dynamic.Message{
2020-01-22 17:26:03 +00:00
ProviderName: p.name,
Configuration: configuration,
}
}
}
}
}
notify := func(err error, time time.Duration) {
2022-11-30 08:50:05 +00:00
log.Ctx(ctx).Error().Err(err).Msgf("Provider error, retrying in %s", time)
}
2020-02-10 15:03:39 +00:00
2022-11-30 08:50:05 +00:00
return backoff.RetryNotify(safe.OperationWithRecover(operation),
2020-02-10 15:03:39 +00:00
backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctx), notify)
}
2022-08-11 13:42:07 +00:00
func (p *Provider) buildConfiguration(ctx context.Context) (*dynamic.Configuration, error) {
pairs, err := p.kvClient.List(ctx, p.RootKey, nil)
if err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
// This empty configuration satisfies the pkg/server/configurationwatcher.go isEmptyConfiguration func constraints,
// and will not be discarded by the configuration watcher.
return &dynamic.Configuration{
HTTP: &dynamic.HTTPConfiguration{
Routers: make(map[string]*dynamic.Router),
},
}, nil
}
return nil, err
}
cfg := &dynamic.Configuration{}
err = kv.Decode(pairs, cfg, p.RootKey)
if err != nil {
return nil, err
}
return cfg, nil
}
2022-09-12 15:40:09 +00:00
func (p *Provider) createKVClient(ctx context.Context, storeType string, config valkeyrie.Config) (store.Store, error) {
kvStore, err := valkeyrie.NewStore(ctx, storeType, p.Endpoints, config)
if err != nil {
return nil, err
}
return &storeWrapper{Store: kvStore}, nil
}