263 lines
7.8 KiB
Go
263 lines
7.8 KiB
Go
package kv
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"text/template"
|
|
"time"
|
|
|
|
"github.com/BurntSushi/ty/fun"
|
|
"github.com/cenk/backoff"
|
|
"github.com/containous/traefik/job"
|
|
"github.com/containous/traefik/log"
|
|
"github.com/containous/traefik/provider"
|
|
"github.com/containous/traefik/safe"
|
|
"github.com/containous/traefik/types"
|
|
"github.com/docker/libkv"
|
|
"github.com/docker/libkv/store"
|
|
)
|
|
|
|
// Provider holds common configurations of key-value providers.
|
|
type Provider struct {
|
|
provider.BaseProvider `mapstructure:",squash" export:"true"`
|
|
Endpoint string `description:"Comma separated server endpoints"`
|
|
Prefix string `description:"Prefix used for KV store" export:"true"`
|
|
TLS *types.ClientTLS `description:"Enable TLS support" export:"true"`
|
|
Username string `description:"KV Username"`
|
|
Password string `description:"KV Password"`
|
|
storeType store.Backend
|
|
kvclient store.Store
|
|
}
|
|
|
|
// CreateStore create the K/V store
|
|
func (p *Provider) CreateStore() (store.Store, error) {
|
|
storeConfig := &store.Config{
|
|
ConnectionTimeout: 30 * time.Second,
|
|
Bucket: "traefik",
|
|
Username: p.Username,
|
|
Password: p.Password,
|
|
}
|
|
|
|
if p.TLS != nil {
|
|
var err error
|
|
storeConfig.TLS, err = p.TLS.CreateTLSConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return libkv.NewStore(
|
|
p.storeType,
|
|
strings.Split(p.Endpoint, ","),
|
|
storeConfig,
|
|
)
|
|
}
|
|
|
|
// SetStoreType storeType setter
|
|
func (p *Provider) SetStoreType(storeType store.Backend) {
|
|
p.storeType = storeType
|
|
}
|
|
|
|
// SetKVClient kvclient setter
|
|
func (p *Provider) SetKVClient(kvClient store.Store) {
|
|
p.kvclient = kvClient
|
|
}
|
|
|
|
func (p *Provider) watchKv(configurationChan chan<- types.ConfigMessage, prefix string, stop chan bool) error {
|
|
operation := func() error {
|
|
events, err := p.kvclient.WatchTree(p.Prefix, make(chan struct{}))
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to KV WatchTree: %v", err)
|
|
}
|
|
for {
|
|
select {
|
|
case <-stop:
|
|
return nil
|
|
case _, ok := <-events:
|
|
if !ok {
|
|
return errors.New("watchtree channel closed")
|
|
}
|
|
configuration := p.loadConfig()
|
|
if configuration != nil {
|
|
configurationChan <- types.ConfigMessage{
|
|
ProviderName: string(p.storeType),
|
|
Configuration: configuration,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
notify := func(err error, time time.Duration) {
|
|
log.Errorf("KV connection error: %+v, retrying in %s", err, time)
|
|
}
|
|
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
|
|
if err != nil {
|
|
return fmt.Errorf("Cannot connect to KV server: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Provide provides the configuration to traefik via the configuration channel
|
|
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
|
|
p.Constraints = append(p.Constraints, constraints...)
|
|
operation := func() error {
|
|
if _, err := p.kvclient.Exists("qmslkjdfmqlskdjfmqlksjazçueznbvbwzlkajzebvkwjdcqmlsfj"); err != nil {
|
|
return fmt.Errorf("Failed to test KV store connection: %v", err)
|
|
}
|
|
if p.Watch {
|
|
pool.Go(func(stop chan bool) {
|
|
err := p.watchKv(configurationChan, p.Prefix, stop)
|
|
if err != nil {
|
|
log.Errorf("Cannot watch KV store: %v", err)
|
|
}
|
|
})
|
|
}
|
|
configuration := p.loadConfig()
|
|
configurationChan <- types.ConfigMessage{
|
|
ProviderName: string(p.storeType),
|
|
Configuration: configuration,
|
|
}
|
|
return nil
|
|
}
|
|
notify := func(err error, time time.Duration) {
|
|
log.Errorf("KV connection error: %+v, retrying in %s", err, time)
|
|
}
|
|
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
|
|
if err != nil {
|
|
return fmt.Errorf("Cannot connect to KV server: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *Provider) loadConfig() *types.Configuration {
|
|
templateObjects := struct {
|
|
Prefix string
|
|
}{
|
|
// Allow `/traefik/alias` to superesede `p.Prefix`
|
|
strings.TrimSuffix(p.get(p.Prefix, p.Prefix+"/alias"), "/"),
|
|
}
|
|
|
|
var KvFuncMap = template.FuncMap{
|
|
"List": p.list,
|
|
"ListServers": p.listServers,
|
|
"Get": p.get,
|
|
"SplitGet": p.splitGet,
|
|
"Last": p.last,
|
|
"getSticky": p.getSticky,
|
|
"hasStickinessLabel": p.hasStickinessLabel,
|
|
"getStickinessCookieName": p.getStickinessCookieName,
|
|
}
|
|
|
|
configuration, err := p.GetConfiguration("templates/kv.tmpl", KvFuncMap, templateObjects)
|
|
if err != nil {
|
|
log.Error(err)
|
|
}
|
|
|
|
for key, frontend := range configuration.Frontends {
|
|
if _, ok := configuration.Backends[frontend.Backend]; !ok {
|
|
delete(configuration.Frontends, key)
|
|
}
|
|
}
|
|
|
|
return configuration
|
|
}
|
|
|
|
func (p *Provider) list(keys ...string) []string {
|
|
joinedKeys := strings.Join(keys, "")
|
|
keysPairs, err := p.kvclient.List(joinedKeys)
|
|
if err != nil {
|
|
log.Debugf("Cannot get keys %s %s ", joinedKeys, err)
|
|
return nil
|
|
}
|
|
directoryKeys := make(map[string]string)
|
|
for _, key := range keysPairs {
|
|
directory := strings.Split(strings.TrimPrefix(key.Key, joinedKeys), "/")[0]
|
|
directoryKeys[directory] = joinedKeys + directory
|
|
}
|
|
return fun.Values(directoryKeys).([]string)
|
|
}
|
|
|
|
func (p *Provider) listServers(backend string) []string {
|
|
serverNames := p.list(backend, "/servers/")
|
|
return fun.Filter(func(serverName string) bool {
|
|
key := fmt.Sprint(serverName, "/url")
|
|
if _, err := p.kvclient.Get(key); err != nil {
|
|
if err != store.ErrKeyNotFound {
|
|
log.Errorf("Failed to retrieve value for key %s: %s", key, err)
|
|
}
|
|
return false
|
|
}
|
|
return p.checkConstraints(serverName, "/tags")
|
|
}, serverNames).([]string)
|
|
}
|
|
|
|
func (p *Provider) get(defaultValue string, keys ...string) string {
|
|
joinedKeys := strings.Join(keys, "")
|
|
keyPair, err := p.kvclient.Get(strings.TrimPrefix(joinedKeys, "/"))
|
|
if err != nil {
|
|
log.Debugf("Cannot get key %s %s, setting default %s", joinedKeys, err, defaultValue)
|
|
return defaultValue
|
|
} else if keyPair == nil {
|
|
log.Debugf("Cannot get key %s, setting default %s", joinedKeys, defaultValue)
|
|
return defaultValue
|
|
}
|
|
return string(keyPair.Value)
|
|
}
|
|
|
|
func (p *Provider) splitGet(keys ...string) []string {
|
|
joinedKeys := strings.Join(keys, "")
|
|
keyPair, err := p.kvclient.Get(joinedKeys)
|
|
if err != nil {
|
|
log.Debugf("Cannot get key %s %s, setting default empty", joinedKeys, err)
|
|
return []string{}
|
|
} else if keyPair == nil {
|
|
log.Debugf("Cannot get key %s, setting default %empty", joinedKeys)
|
|
return []string{}
|
|
}
|
|
return strings.Split(string(keyPair.Value), ",")
|
|
}
|
|
|
|
func (p *Provider) last(key string) string {
|
|
splittedKey := strings.Split(key, "/")
|
|
return splittedKey[len(splittedKey)-1]
|
|
}
|
|
|
|
func (p *Provider) checkConstraints(keys ...string) bool {
|
|
joinedKeys := strings.Join(keys, "")
|
|
keyPair, err := p.kvclient.Get(joinedKeys)
|
|
|
|
value := ""
|
|
if err == nil && keyPair != nil && keyPair.Value != nil {
|
|
value = string(keyPair.Value)
|
|
}
|
|
|
|
constraintTags := strings.Split(value, ",")
|
|
ok, failingConstraint := p.MatchConstraints(constraintTags)
|
|
if !ok {
|
|
if failingConstraint != nil {
|
|
log.Debugf("Constraint %v not matching with following tags: %v", failingConstraint.String(), value)
|
|
}
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (p *Provider) getSticky(rootPath string) string {
|
|
stickyValue := p.get("", rootPath, "/loadbalancer", "/sticky")
|
|
if len(stickyValue) > 0 {
|
|
log.Warnf("Deprecated configuration found: %s. Please use %s.", "loadbalancer/sticky", "loadbalancer/stickiness")
|
|
} else {
|
|
stickyValue = "false"
|
|
}
|
|
return stickyValue
|
|
}
|
|
|
|
func (p *Provider) hasStickinessLabel(rootPath string) bool {
|
|
stickinessValue := p.get("false", rootPath, "/loadbalancer", "/stickiness")
|
|
return len(stickinessValue) > 0 && strings.EqualFold(strings.TrimSpace(stickinessValue), "true")
|
|
}
|
|
|
|
func (p *Provider) getStickinessCookieName(rootPath string) string {
|
|
return p.get("", rootPath, "/loadbalancer", "/stickiness", "/cookiename")
|
|
}
|