Merge pull request #1444 from vdemeester/extract-providers

Extract providers to their own packages
This commit is contained in:
Vincent Demeester 2017-04-18 22:54:49 +02:00 committed by GitHub
commit f03a9e502f
36 changed files with 861 additions and 833 deletions

View file

@ -11,8 +11,19 @@ import (
"github.com/containous/flaeg"
"github.com/containous/traefik/acme"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/provider/boltdb"
"github.com/containous/traefik/provider/consul"
"github.com/containous/traefik/provider/docker"
"github.com/containous/traefik/provider/dynamodb"
"github.com/containous/traefik/provider/ecs"
"github.com/containous/traefik/provider/etcd"
"github.com/containous/traefik/provider/eureka"
"github.com/containous/traefik/provider/file"
"github.com/containous/traefik/provider/kubernetes"
"github.com/containous/traefik/provider/marathon"
"github.com/containous/traefik/provider/mesos"
"github.com/containous/traefik/provider/rancher"
"github.com/containous/traefik/provider/zk"
"github.com/containous/traefik/types"
)
@ -42,20 +53,20 @@ type GlobalConfiguration struct {
InsecureSkipVerify bool `description:"Disable SSL certificate verification"`
Retry *Retry `description:"Enable retry sending request if network error"`
Docker *docker.Provider `description:"Enable Docker backend"`
File *provider.File `description:"Enable File backend"`
File *file.Provider `description:"Enable File backend"`
Web *WebProvider `description:"Enable Web backend"`
Marathon *provider.Marathon `description:"Enable Marathon backend"`
Consul *provider.Consul `description:"Enable Consul backend"`
ConsulCatalog *provider.ConsulCatalog `description:"Enable Consul catalog backend"`
Etcd *provider.Etcd `description:"Enable Etcd backend"`
Zookeeper *provider.Zookepper `description:"Enable Zookeeper backend"`
Boltdb *provider.BoltDb `description:"Enable Boltdb backend"`
Kubernetes *provider.Kubernetes `description:"Enable Kubernetes backend"`
Mesos *provider.Mesos `description:"Enable Mesos backend"`
Eureka *provider.Eureka `description:"Enable Eureka backend"`
ECS *provider.ECS `description:"Enable ECS backend"`
Rancher *provider.Rancher `description:"Enable Rancher backend"`
DynamoDB *provider.DynamoDB `description:"Enable DynamoDB backend"`
Marathon *marathon.Provider `description:"Enable Marathon backend"`
Consul *consul.Provider `description:"Enable Consul backend"`
ConsulCatalog *consul.CatalogProvider `description:"Enable Consul catalog backend"`
Etcd *etcd.Provider `description:"Enable Etcd backend"`
Zookeeper *zk.Provider `description:"Enable Zookeeper backend"`
Boltdb *boltdb.Provider `description:"Enable Boltdb backend"`
Kubernetes *kubernetes.Provider `description:"Enable Kubernetes backend"`
Mesos *mesos.Provider `description:"Enable Mesos backend"`
Eureka *eureka.Provider `description:"Enable Eureka backend"`
ECS *ecs.Provider `description:"Enable ECS backend"`
Rancher *rancher.Provider `description:"Enable Rancher backend"`
DynamoDB *dynamodb.Provider `description:"Enable DynamoDB backend"`
}
// DefaultEntryPoints holds default entry points
@ -336,7 +347,7 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
defaultDocker.SwarmMode = false
// default File
var defaultFile provider.File
var defaultFile file.Provider
defaultFile.Watch = true
defaultFile.Filename = "" //needs equivalent to viper.ConfigFileUsed()
@ -355,7 +366,7 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
}
// default Marathon
var defaultMarathon provider.Marathon
var defaultMarathon marathon.Provider
defaultMarathon.Watch = true
defaultMarathon.Endpoint = "http://127.0.0.1:8080"
defaultMarathon.ExposedByDefault = true
@ -364,47 +375,47 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
defaultMarathon.KeepAlive = 10
// default Consul
var defaultConsul provider.Consul
var defaultConsul consul.Provider
defaultConsul.Watch = true
defaultConsul.Endpoint = "127.0.0.1:8500"
defaultConsul.Prefix = "traefik"
defaultConsul.Constraints = types.Constraints{}
// default ConsulCatalog
var defaultConsulCatalog provider.ConsulCatalog
// default CatalogProvider
var defaultConsulCatalog consul.CatalogProvider
defaultConsulCatalog.Endpoint = "127.0.0.1:8500"
defaultConsulCatalog.Constraints = types.Constraints{}
// default Etcd
var defaultEtcd provider.Etcd
var defaultEtcd etcd.Provider
defaultEtcd.Watch = true
defaultEtcd.Endpoint = "127.0.0.1:2379"
defaultEtcd.Prefix = "/traefik"
defaultEtcd.Constraints = types.Constraints{}
//default Zookeeper
var defaultZookeeper provider.Zookepper
var defaultZookeeper zk.Provider
defaultZookeeper.Watch = true
defaultZookeeper.Endpoint = "127.0.0.1:2181"
defaultZookeeper.Prefix = "/traefik"
defaultZookeeper.Constraints = types.Constraints{}
//default Boltdb
var defaultBoltDb provider.BoltDb
var defaultBoltDb boltdb.Provider
defaultBoltDb.Watch = true
defaultBoltDb.Endpoint = "127.0.0.1:4001"
defaultBoltDb.Prefix = "/traefik"
defaultBoltDb.Constraints = types.Constraints{}
//default Kubernetes
var defaultKubernetes provider.Kubernetes
//default Provider
var defaultKubernetes kubernetes.Provider
defaultKubernetes.Watch = true
defaultKubernetes.Endpoint = ""
defaultKubernetes.LabelSelector = ""
defaultKubernetes.Constraints = types.Constraints{}
// default Mesos
var defaultMesos provider.Mesos
var defaultMesos mesos.Provider
defaultMesos.Watch = true
defaultMesos.Endpoint = "http://127.0.0.1:5050"
defaultMesos.ExposedByDefault = true
@ -414,7 +425,7 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
defaultMesos.StateTimeoutSecond = 30
//default ECS
var defaultECS provider.ECS
var defaultECS ecs.Provider
defaultECS.Watch = true
defaultECS.ExposedByDefault = true
defaultECS.RefreshSeconds = 15
@ -422,12 +433,12 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
defaultECS.Constraints = types.Constraints{}
//default Rancher
var defaultRancher provider.Rancher
var defaultRancher rancher.Provider
defaultRancher.Watch = true
defaultRancher.ExposedByDefault = true
// default DynamoDB
var defaultDynamoDB provider.DynamoDB
var defaultDynamoDB dynamodb.Provider
defaultDynamoDB.Constraints = types.Constraints{}
defaultDynamoDB.RefreshSeconds = 15
defaultDynamoDB.TableName = "traefik"

View file

@ -1,26 +1,25 @@
package main
import (
"context"
"errors"
"io/ioutil"
"net/http"
"os"
"os/exec"
"strings"
"sync"
"time"
"context"
"github.com/containous/staert"
"github.com/containous/traefik/cluster"
"github.com/containous/traefik/integration/utils"
"github.com/containous/traefik/provider"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
"github.com/go-check/check"
"errors"
"github.com/containous/traefik/cluster"
"github.com/containous/traefik/integration/utils"
"github.com/containous/traefik/provider"
checker "github.com/vdemeester/shakers"
"io/ioutil"
"os"
"strings"
"sync"
)
// Consul test suites (using libcompose)

View file

@ -1,35 +0,0 @@
package provider
import (
"fmt"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/boltdb"
)
var _ Provider = (*BoltDb)(nil)
// BoltDb holds configurations of the BoltDb provider.
type BoltDb struct {
Kv `mapstructure:",squash"`
}
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *BoltDb) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
store, err := provider.CreateStore()
if err != nil {
return fmt.Errorf("Failed to Connect to KV store: %v", err)
}
provider.kvclient = store
return provider.provide(configurationChan, pool, constraints)
}
// CreateStore creates the KV store
func (provider *BoltDb) CreateStore() (store.Store, error) {
provider.storeType = store.BOLTDB
boltdb.Register()
return provider.createStore()
}

37
provider/boltdb/boltdb.go Normal file
View file

@ -0,0 +1,37 @@
package boltdb
import (
"fmt"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/provider/kv"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/boltdb"
)
var _ provider.Provider = (*Provider)(nil)
// Provider holds configurations of the provider.
type Provider struct {
kv.Provider `mapstructure:",squash"`
}
// Provide allows the boltdb provider to Provide configurations to traefik
// using the given configuration channel.
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
store, err := p.CreateStore()
if err != nil {
return fmt.Errorf("Failed to Connect to KV store: %v", err)
}
p.Kvclient = store
return p.Provider.Provide(configurationChan, pool, constraints)
}
// CreateStore creates the KV store
func (p *Provider) CreateStore() (store.Store, error) {
p.StoreType = store.BOLTDB
boltdb.Register()
return p.Provider.CreateStore()
}

View file

@ -1,35 +0,0 @@
package provider
import (
"fmt"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
)
var _ Provider = (*Consul)(nil)
// Consul holds configurations of the Consul provider.
type Consul struct {
Kv `mapstructure:",squash"`
}
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *Consul) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
store, err := provider.CreateStore()
if err != nil {
return fmt.Errorf("Failed to Connect to KV store: %v", err)
}
provider.kvclient = store
return provider.provide(configurationChan, pool, constraints)
}
// CreateStore creates the KV store
func (provider *Consul) CreateStore() (store.Store, error) {
provider.storeType = store.CONSUL
consul.Register()
return provider.createStore()
}

37
provider/consul/consul.go Normal file
View file

@ -0,0 +1,37 @@
package consul
import (
"fmt"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/provider/kv"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
)
var _ provider.Provider = (*Provider)(nil)
// Provider holds configurations of the p.
type Provider struct {
kv.Provider `mapstructure:",squash"`
}
// Provide allows the consul provider to provide configurations to traefik
// using the given configuration channel.
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
store, err := p.CreateStore()
if err != nil {
return fmt.Errorf("Failed to Connect to KV store: %v", err)
}
p.Kvclient = store
return p.Provider.Provide(configurationChan, pool, constraints)
}
// CreateStore creates the KV store
func (p *Provider) CreateStore() (store.Store, error) {
p.StoreType = store.CONSUL
consul.Register()
return p.Provider.CreateStore()
}

View file

@ -1,4 +1,4 @@
package provider
package consul
import (
"errors"
@ -13,6 +13,7 @@ import (
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/hashicorp/consul/api"
@ -25,15 +26,15 @@ const (
DefaultConsulCatalogTagPrefix = "traefik"
)
var _ Provider = (*ConsulCatalog)(nil)
var _ provider.Provider = (*CatalogProvider)(nil)
// ConsulCatalog holds configurations of the Consul catalog provider.
type ConsulCatalog struct {
BaseProvider `mapstructure:",squash"`
Endpoint string `description:"Consul server endpoint"`
Domain string `description:"Default domain used"`
client *api.Client
Prefix string
// CatalogProvider holds configurations of the Consul catalog provider.
type CatalogProvider struct {
provider.BaseProvider `mapstructure:",squash"`
Endpoint string `description:"Consul server endpoint"`
Domain string `description:"Default domain used"`
client *api.Client
Prefix string
}
type serviceUpdate struct {
@ -75,10 +76,10 @@ func (a nodeSorter) Less(i int, j int) bool {
return lentr.Service.Port < rentr.Service.Port
}
func (provider *ConsulCatalog) watchServices(stopCh <-chan struct{}) <-chan map[string][]string {
func (p *CatalogProvider) watchServices(stopCh <-chan struct{}) <-chan map[string][]string {
watchCh := make(chan map[string][]string)
catalog := provider.client.Catalog()
catalog := p.client.Catalog()
safe.Go(func() {
defer close(watchCh)
@ -114,8 +115,8 @@ func (provider *ConsulCatalog) watchServices(stopCh <-chan struct{}) <-chan map[
return watchCh
}
func (provider *ConsulCatalog) healthyNodes(service string) (catalogUpdate, error) {
health := provider.client.Health()
func (p *CatalogProvider) healthyNodes(service string) (catalogUpdate, error) {
health := p.client.Health()
opts := &api.QueryOptions{}
data, _, err := health.Service(service, "", true, opts)
if err != nil {
@ -124,8 +125,8 @@ func (provider *ConsulCatalog) healthyNodes(service string) (catalogUpdate, erro
}
nodes := fun.Filter(func(node *api.ServiceEntry) bool {
constraintTags := provider.getContraintTags(node.Service.Tags)
ok, failingConstraint := provider.MatchConstraints(constraintTags)
constraintTags := p.getContraintTags(node.Service.Tags)
ok, failingConstraint := p.MatchConstraints(constraintTags)
if ok == false && failingConstraint != nil {
log.Debugf("Service %v pruned by '%v' constraint", service, failingConstraint.String())
}
@ -149,34 +150,34 @@ func (provider *ConsulCatalog) healthyNodes(service string) (catalogUpdate, erro
}, nil
}
func (provider *ConsulCatalog) getEntryPoints(list string) []string {
func (p *CatalogProvider) getEntryPoints(list string) []string {
return strings.Split(list, ",")
}
func (provider *ConsulCatalog) getBackend(node *api.ServiceEntry) string {
func (p *CatalogProvider) getBackend(node *api.ServiceEntry) string {
return strings.ToLower(node.Service.Service)
}
func (provider *ConsulCatalog) getFrontendRule(service serviceUpdate) string {
customFrontendRule := provider.getAttribute("frontend.rule", service.Attributes, "")
func (p *CatalogProvider) getFrontendRule(service serviceUpdate) string {
customFrontendRule := p.getAttribute("frontend.rule", service.Attributes, "")
if customFrontendRule != "" {
return customFrontendRule
}
return "Host:" + service.ServiceName + "." + provider.Domain
return "Host:" + service.ServiceName + "." + p.Domain
}
func (provider *ConsulCatalog) getBackendAddress(node *api.ServiceEntry) string {
func (p *CatalogProvider) getBackendAddress(node *api.ServiceEntry) string {
if node.Service.Address != "" {
return node.Service.Address
}
return node.Node.Address
}
func (provider *ConsulCatalog) getBackendName(node *api.ServiceEntry, index int) string {
func (p *CatalogProvider) getBackendName(node *api.ServiceEntry, index int) string {
serviceName := strings.ToLower(node.Service.Service) + "--" + node.Service.Address + "--" + strconv.Itoa(node.Service.Port)
for _, tag := range node.Service.Tags {
serviceName += "--" + Normalize(tag)
serviceName += "--" + provider.Normalize(tag)
}
serviceName = strings.Replace(serviceName, ".", "-", -1)
@ -187,7 +188,7 @@ func (provider *ConsulCatalog) getBackendName(node *api.ServiceEntry, index int)
return serviceName
}
func (provider *ConsulCatalog) getAttribute(name string, tags []string, defaultValue string) string {
func (p *CatalogProvider) getAttribute(name string, tags []string, defaultValue string) string {
for _, tag := range tags {
if strings.Index(strings.ToLower(tag), DefaultConsulCatalogTagPrefix+".") == 0 {
if kv := strings.SplitN(tag[len(DefaultConsulCatalogTagPrefix+"."):], "=", 2); len(kv) == 2 && strings.ToLower(kv[0]) == strings.ToLower(name) {
@ -198,7 +199,7 @@ func (provider *ConsulCatalog) getAttribute(name string, tags []string, defaultV
return defaultValue
}
func (provider *ConsulCatalog) getContraintTags(tags []string) []string {
func (p *CatalogProvider) getContraintTags(tags []string) []string {
var list []string
for _, tag := range tags {
@ -211,22 +212,22 @@ func (provider *ConsulCatalog) getContraintTags(tags []string) []string {
return list
}
func (provider *ConsulCatalog) buildConfig(catalog []catalogUpdate) *types.Configuration {
func (p *CatalogProvider) buildConfig(catalog []catalogUpdate) *types.Configuration {
var FuncMap = template.FuncMap{
"getBackend": provider.getBackend,
"getFrontendRule": provider.getFrontendRule,
"getBackendName": provider.getBackendName,
"getBackendAddress": provider.getBackendAddress,
"getAttribute": provider.getAttribute,
"getEntryPoints": provider.getEntryPoints,
"hasMaxconnAttributes": provider.hasMaxconnAttributes,
"getBackend": p.getBackend,
"getFrontendRule": p.getFrontendRule,
"getBackendName": p.getBackendName,
"getBackendAddress": p.getBackendAddress,
"getAttribute": p.getAttribute,
"getEntryPoints": p.getEntryPoints,
"hasMaxconnAttributes": p.hasMaxconnAttributes,
}
allNodes := []*api.ServiceEntry{}
services := []*serviceUpdate{}
for _, info := range catalog {
for _, node := range info.Nodes {
isEnabled := provider.getAttribute("enable", node.Service.Tags, "true")
isEnabled := p.getAttribute("enable", node.Service.Tags, "true")
if isEnabled != "false" && len(info.Nodes) > 0 {
services = append(services, info.Service)
allNodes = append(allNodes, info.Nodes...)
@ -246,7 +247,7 @@ func (provider *ConsulCatalog) buildConfig(catalog []catalogUpdate) *types.Confi
Nodes: allNodes,
}
configuration, err := provider.GetConfiguration("templates/consul_catalog.tmpl", FuncMap, templateObjects)
configuration, err := p.GetConfiguration("templates/consul_catalog.tmpl", FuncMap, templateObjects)
if err != nil {
log.WithError(err).Error("Failed to create config")
}
@ -254,16 +255,16 @@ func (provider *ConsulCatalog) buildConfig(catalog []catalogUpdate) *types.Confi
return configuration
}
func (provider *ConsulCatalog) hasMaxconnAttributes(attributes []string) bool {
amount := provider.getAttribute("backend.maxconn.amount", attributes, "")
extractorfunc := provider.getAttribute("backend.maxconn.extractorfunc", attributes, "")
func (p *CatalogProvider) hasMaxconnAttributes(attributes []string) bool {
amount := p.getAttribute("backend.maxconn.amount", attributes, "")
extractorfunc := p.getAttribute("backend.maxconn.extractorfunc", attributes, "")
if amount != "" && extractorfunc != "" {
return true
}
return false
}
func (provider *ConsulCatalog) getNodes(index map[string][]string) ([]catalogUpdate, error) {
func (p *CatalogProvider) getNodes(index map[string][]string) ([]catalogUpdate, error) {
visited := make(map[string]bool)
nodes := []catalogUpdate{}
@ -274,7 +275,7 @@ func (provider *ConsulCatalog) getNodes(index map[string][]string) ([]catalogUpd
log.WithFields(logrus.Fields{
"service": name,
}).Debug("Fetching service")
healthy, err := provider.healthyNodes(name)
healthy, err := p.healthyNodes(name)
if err != nil {
return nil, err
}
@ -287,9 +288,9 @@ func (provider *ConsulCatalog) getNodes(index map[string][]string) ([]catalogUpd
return nodes, nil
}
func (provider *ConsulCatalog) watch(configurationChan chan<- types.ConfigMessage, stop chan bool) error {
func (p *CatalogProvider) watch(configurationChan chan<- types.ConfigMessage, stop chan bool) error {
stopCh := make(chan struct{})
serviceCatalog := provider.watchServices(stopCh)
serviceCatalog := p.watchServices(stopCh)
defer close(stopCh)
@ -302,11 +303,11 @@ func (provider *ConsulCatalog) watch(configurationChan chan<- types.ConfigMessag
return errors.New("Consul service list nil")
}
log.Debug("List of services changed")
nodes, err := provider.getNodes(index)
nodes, err := p.getNodes(index)
if err != nil {
return err
}
configuration := provider.buildConfig(nodes)
configuration := p.buildConfig(nodes)
configurationChan <- types.ConfigMessage{
ProviderName: "consul_catalog",
Configuration: configuration,
@ -315,24 +316,24 @@ func (provider *ConsulCatalog) watch(configurationChan chan<- types.ConfigMessag
}
}
// Provide allows the provider to provide configurations to traefik
// Provide allows the consul catalog provider to provide configurations to traefik
// using the given configuration channel.
func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
func (p *CatalogProvider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
config := api.DefaultConfig()
config.Address = provider.Endpoint
config.Address = p.Endpoint
client, err := api.NewClient(config)
if err != nil {
return err
}
provider.client = client
provider.Constraints = append(provider.Constraints, constraints...)
p.client = client
p.Constraints = append(p.Constraints, constraints...)
pool.Go(func(stop chan bool) {
notify := func(err error, time time.Duration) {
log.Errorf("Consul connection error %+v, retrying in %s", err, time)
}
operation := func() error {
return provider.watch(configurationChan, stop)
return p.watch(configurationChan, stop)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {

View file

@ -1,4 +1,4 @@
package provider
package consul
import (
"reflect"
@ -10,7 +10,7 @@ import (
)
func TestConsulCatalogGetFrontendRule(t *testing.T) {
provider := &ConsulCatalog{
provider := &CatalogProvider{
Domain: "localhost",
}
@ -45,7 +45,7 @@ func TestConsulCatalogGetFrontendRule(t *testing.T) {
}
func TestConsulCatalogGetAttribute(t *testing.T) {
provider := &ConsulCatalog{
provider := &CatalogProvider{
Domain: "localhost",
}
@ -84,7 +84,7 @@ func TestConsulCatalogGetAttribute(t *testing.T) {
}
func TestConsulCatalogGetBackendAddress(t *testing.T) {
provider := &ConsulCatalog{
provider := &CatalogProvider{
Domain: "localhost",
}
@ -125,7 +125,7 @@ func TestConsulCatalogGetBackendAddress(t *testing.T) {
}
func TestConsulCatalogGetBackendName(t *testing.T) {
provider := &ConsulCatalog{
provider := &CatalogProvider{
Domain: "localhost",
}
@ -177,7 +177,7 @@ func TestConsulCatalogGetBackendName(t *testing.T) {
}
func TestConsulCatalogBuildConfig(t *testing.T) {
provider := &ConsulCatalog{
provider := &CatalogProvider{
Domain: "localhost",
}

View file

@ -41,15 +41,15 @@ const (
var _ provider.Provider = (*Provider)(nil)
// Provider holds configurations of the Provider.
// Provider holds configurations of the provider.
type Provider struct {
provider.BaseProvider `mapstructure:",squash"`
Endpoint string `description:"Provider server endpoint. Can be a tcp or a unix socket endpoint"`
Endpoint string `description:"Docker server endpoint. Can be a tcp or a unix socket endpoint"`
Domain string `description:"Default domain used"`
TLS *provider.ClientTLS `description:"Enable Provider TLS support"`
TLS *provider.ClientTLS `description:"Enable Docker TLS support"`
ExposedByDefault bool `description:"Expose containers by default"`
UseBindPortIP bool `description:"Use the ip address from the bound port, rather than from the inner network"`
SwarmMode bool `description:"Use Provider on Swarm Mode"`
SwarmMode bool `description:"Use Docker on Swarm Mode"`
}
// dockerData holds the need data to the Provider p
@ -112,7 +112,7 @@ func (p *Provider) createClient() (client.APIClient, error) {
}
// Provide allows the p to provide configurations to traefik
// Provide allows the docker provider to provide configurations to traefik
// using the given configuration channel.
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
p.Constraints = append(p.Constraints, constraints...)

View file

@ -1,4 +1,4 @@
package provider
package dynamodb
import (
"context"
@ -15,15 +15,16 @@ import (
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
)
var _ Provider = (*DynamoDB)(nil)
var _ provider.Provider = (*Provider)(nil)
// DynamoDB holds configuration for DynamoDB provider.
type DynamoDB struct {
BaseProvider `mapstructure:",squash"`
// Provider holds configuration for provider.
type Provider struct {
provider.BaseProvider `mapstructure:",squash"`
AccessKeyID string `description:"The AWS credentials access key to use for making requests"`
RefreshSeconds int `description:"Polling interval (in seconds)"`
@ -38,20 +39,20 @@ type dynamoClient struct {
}
// createClient configures aws credentials and creates a dynamoClient
func (provider *DynamoDB) createClient() (*dynamoClient, error) {
log.Infof("Creating DynamoDB client...")
func (p *Provider) createClient() (*dynamoClient, error) {
log.Infof("Creating Provider client...")
sess := session.New()
if provider.Region == "" {
return nil, errors.New("no Region provided for DynamoDB")
if p.Region == "" {
return nil, errors.New("no Region provided for Provider")
}
cfg := &aws.Config{
Region: &provider.Region,
Region: &p.Region,
Credentials: credentials.NewChainCredentials(
[]credentials.Provider{
&credentials.StaticProvider{
Value: credentials.Value{
AccessKeyID: provider.AccessKeyID,
SecretAccessKey: provider.SecretAccessKey,
AccessKeyID: p.AccessKeyID,
SecretAccessKey: p.SecretAccessKey,
},
},
&credentials.EnvProvider{},
@ -60,8 +61,8 @@ func (provider *DynamoDB) createClient() (*dynamoClient, error) {
}),
}
if provider.Endpoint != "" {
cfg.Endpoint = aws.String(provider.Endpoint)
if p.Endpoint != "" {
cfg.Endpoint = aws.String(p.Endpoint)
}
return &dynamoClient{
@ -70,10 +71,10 @@ func (provider *DynamoDB) createClient() (*dynamoClient, error) {
}
// scanTable scans the given table and returns slice of all items in the table
func (provider *DynamoDB) scanTable(client *dynamoClient) ([]map[string]*dynamodb.AttributeValue, error) {
log.Debugf("Scanning DynamoDB table: %s ...", provider.TableName)
func (p *Provider) scanTable(client *dynamoClient) ([]map[string]*dynamodb.AttributeValue, error) {
log.Debugf("Scanning Provider table: %s ...", p.TableName)
params := &dynamodb.ScanInput{
TableName: aws.String(provider.TableName),
TableName: aws.String(p.TableName),
}
items := make([]map[string]*dynamodb.AttributeValue, 0)
err := client.db.ScanPages(params,
@ -82,49 +83,49 @@ func (provider *DynamoDB) scanTable(client *dynamoClient) ([]map[string]*dynamod
return !lastPage
})
if err != nil {
log.Errorf("Failed to scan DynamoDB table %s", provider.TableName)
log.Errorf("Failed to scan Provider table %s", p.TableName)
return nil, err
}
log.Debugf("Successfully scanned DynamoDB table %s", provider.TableName)
log.Debugf("Successfully scanned Provider table %s", p.TableName)
return items, nil
}
// loadDynamoConfig retrieves items from dynamodb and converts them into Backends and Frontends in a Configuration
func (provider *DynamoDB) loadDynamoConfig(client *dynamoClient) (*types.Configuration, error) {
items, err := provider.scanTable(client)
func (p *Provider) loadDynamoConfig(client *dynamoClient) (*types.Configuration, error) {
items, err := p.scanTable(client)
if err != nil {
return nil, err
}
log.Debugf("Number of Items retrieved from DynamoDB: %d", len(items))
log.Debugf("Number of Items retrieved from Provider: %d", len(items))
backends := make(map[string]*types.Backend)
frontends := make(map[string]*types.Frontend)
// unmarshal dynamoAttributes into Backends and Frontends
for i, item := range items {
log.Debugf("DynamoDB Item: %d\n%v", i, item)
log.Debugf("Provider Item: %d\n%v", i, item)
// verify the type of each item by checking to see if it has
// the corresponding type, backend or frontend map
if backend, exists := item["backend"]; exists {
log.Debugf("Unmarshaling backend from DynamoDB...")
log.Debugf("Unmarshaling backend from Provider...")
tmpBackend := &types.Backend{}
err = dynamodbattribute.Unmarshal(backend, tmpBackend)
if err != nil {
log.Errorf(err.Error())
} else {
backends[*item["name"].S] = tmpBackend
log.Debugf("Backend from DynamoDB unmarshalled successfully")
log.Debugf("Backend from Provider unmarshalled successfully")
}
} else if frontend, exists := item["frontend"]; exists {
log.Debugf("Unmarshaling frontend from DynamoDB...")
log.Debugf("Unmarshaling frontend from Provider...")
tmpFrontend := &types.Frontend{}
err = dynamodbattribute.Unmarshal(frontend, tmpFrontend)
if err != nil {
log.Errorf(err.Error())
} else {
frontends[*item["name"].S] = tmpFrontend
log.Debugf("Frontend from DynamoDB unmarshalled successfully")
log.Debugf("Frontend from Provider unmarshalled successfully")
}
} else {
log.Warnf("Error in format of DynamoDB Item: %v", item)
log.Warnf("Error in format of Provider Item: %v", item)
}
}
@ -136,9 +137,9 @@ func (provider *DynamoDB) loadDynamoConfig(client *dynamoClient) (*types.Configu
// Provide provides the configuration to traefik via the configuration channel
// if watch is enabled it polls dynamodb
func (provider *DynamoDB) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
log.Debugf("Providing DynamoDB...")
provider.Constraints = append(provider.Constraints, constraints...)
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
log.Debugf("Providing Provider...")
p.Constraints = append(p.Constraints, constraints...)
handleCanceled := func(ctx context.Context, err error) error {
if ctx.Err() == context.Canceled || err == context.Canceled {
return nil
@ -156,12 +157,12 @@ func (provider *DynamoDB) Provide(configurationChan chan<- types.ConfigMessage,
}()
operation := func() error {
aws, err := provider.createClient()
aws, err := p.createClient()
if err != nil {
return handleCanceled(ctx, err)
}
configuration, err := provider.loadDynamoConfig(aws)
configuration, err := p.loadDynamoConfig(aws)
if err != nil {
return handleCanceled(ctx, err)
}
@ -171,14 +172,14 @@ func (provider *DynamoDB) Provide(configurationChan chan<- types.ConfigMessage,
Configuration: configuration,
}
if provider.Watch {
reload := time.NewTicker(time.Second * time.Duration(provider.RefreshSeconds))
if p.Watch {
reload := time.NewTicker(time.Second * time.Duration(p.RefreshSeconds))
defer reload.Stop()
for {
log.Debugf("Watching DynamoDB...")
log.Debugf("Watching Provider...")
select {
case <-reload.C:
configuration, err := provider.loadDynamoConfig(aws)
configuration, err := p.loadDynamoConfig(aws)
if err != nil {
return handleCanceled(ctx, err)
}
@ -195,12 +196,12 @@ func (provider *DynamoDB) Provide(configurationChan chan<- types.ConfigMessage,
return nil
}
notify := func(err error, time time.Duration) {
log.Errorf("DynamoDB error: %s time: %v", err.Error(), time)
log.Errorf("Provider error: %s time: %v", err.Error(), time)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Failed to connect to DynamoDB. %s", err.Error())
log.Errorf("Failed to connect to Provider. %s", err.Error())
}
})
return nil

View file

@ -1,4 +1,4 @@
package provider
package dynamodb
import (
"errors"
@ -90,7 +90,7 @@ func TestLoadDynamoConfigSuccessful(t *testing.T) {
testWithError: false,
},
}
provider := DynamoDB{}
provider := Provider{}
loadedConfig, err := provider.loadDynamoConfig(dbiface)
if err != nil {
t.Fatal(err)
@ -114,7 +114,7 @@ func TestLoadDynamoConfigFailure(t *testing.T) {
testWithError: true,
},
}
provider := DynamoDB{}
provider := Provider{}
_, err := provider.loadDynamoConfig(dbiface)
if err == nil {
t.Fatal("Expected error")
@ -122,7 +122,7 @@ func TestLoadDynamoConfigFailure(t *testing.T) {
}
func TestCreateClientSuccessful(t *testing.T) {
provider := DynamoDB{
provider := Provider{
Region: "us-east-1",
}
_, err := provider.createClient()
@ -132,7 +132,7 @@ func TestCreateClientSuccessful(t *testing.T) {
}
func TestCreateClientFailure(t *testing.T) {
provider := DynamoDB{}
provider := Provider{}
_, err := provider.createClient()
if err == nil {
t.Fatal("Expected error")

View file

@ -1,4 +1,4 @@
package provider
package ecs
import (
"context"
@ -20,21 +20,22 @@ import (
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
)
var _ Provider = (*ECS)(nil)
var _ provider.Provider = (*Provider)(nil)
// ECS holds configurations of the ECS provider.
type ECS struct {
BaseProvider `mapstructure:",squash"`
// Provider holds configurations of the provider.
type Provider struct {
provider.BaseProvider `mapstructure:",squash"`
Domain string `description:"Default domain used"`
ExposedByDefault bool `description:"Expose containers by default"`
RefreshSeconds int `description:"Polling interval (in seconds)"`
// ECS lookup parameters
// Provider lookup parameters
Cluster string `description:"ECS Cluster Name"`
Region string `description:"The AWS region to use for requests"`
AccessKeyID string `description:"The AWS credentials access key to use for making requests"`
@ -56,26 +57,26 @@ type awsClient struct {
ec2 *ec2.EC2
}
func (provider *ECS) createClient() (*awsClient, error) {
func (p *Provider) createClient() (*awsClient, error) {
sess := session.New()
ec2meta := ec2metadata.New(sess)
if provider.Region == "" {
if p.Region == "" {
log.Infoln("No EC2 region provided, querying instance metadata endpoint...")
identity, err := ec2meta.GetInstanceIdentityDocument()
if err != nil {
return nil, err
}
provider.Region = identity.Region
p.Region = identity.Region
}
cfg := &aws.Config{
Region: &provider.Region,
Region: &p.Region,
Credentials: credentials.NewChainCredentials(
[]credentials.Provider{
&credentials.StaticProvider{
Value: credentials.Value{
AccessKeyID: provider.AccessKeyID,
SecretAccessKey: provider.SecretAccessKey,
AccessKeyID: p.AccessKeyID,
SecretAccessKey: p.SecretAccessKey,
},
},
&credentials.EnvProvider{},
@ -90,11 +91,11 @@ func (provider *ECS) createClient() (*awsClient, error) {
}, nil
}
// Provide allows the provider to provide configurations to traefik
// Provide allows the ecs provider to provide configurations to traefik
// using the given configuration channel.
func (provider *ECS) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
provider.Constraints = append(provider.Constraints, constraints...)
p.Constraints = append(p.Constraints, constraints...)
handleCanceled := func(ctx context.Context, err error) error {
if ctx.Err() == context.Canceled || err == context.Canceled {
@ -113,12 +114,12 @@ func (provider *ECS) Provide(configurationChan chan<- types.ConfigMessage, pool
}()
operation := func() error {
aws, err := provider.createClient()
aws, err := p.createClient()
if err != nil {
return err
}
configuration, err := provider.loadECSConfig(ctx, aws)
configuration, err := p.loadECSConfig(ctx, aws)
if err != nil {
return handleCanceled(ctx, err)
}
@ -128,13 +129,13 @@ func (provider *ECS) Provide(configurationChan chan<- types.ConfigMessage, pool
Configuration: configuration,
}
if provider.Watch {
reload := time.NewTicker(time.Second * time.Duration(provider.RefreshSeconds))
if p.Watch {
reload := time.NewTicker(time.Second * time.Duration(p.RefreshSeconds))
defer reload.Stop()
for {
select {
case <-reload.C:
configuration, err := provider.loadECSConfig(ctx, aws)
configuration, err := p.loadECSConfig(ctx, aws)
if err != nil {
return handleCanceled(ctx, err)
}
@ -153,11 +154,11 @@ func (provider *ECS) Provide(configurationChan chan<- types.ConfigMessage, pool
}
notify := func(err error, time time.Duration) {
log.Errorf("ECS connection error %+v, retrying in %s", err, time)
log.Errorf("Provider connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Cannot connect to ECS api %+v", err)
log.Errorf("Cannot connect to Provider api %+v", err)
}
})
@ -169,32 +170,32 @@ func wrapAws(ctx context.Context, req *request.Request) error {
return req.Send()
}
func (provider *ECS) loadECSConfig(ctx context.Context, client *awsClient) (*types.Configuration, error) {
func (p *Provider) loadECSConfig(ctx context.Context, client *awsClient) (*types.Configuration, error) {
var ecsFuncMap = template.FuncMap{
"filterFrontends": provider.filterFrontends,
"getFrontendRule": provider.getFrontendRule,
"filterFrontends": p.filterFrontends,
"getFrontendRule": p.getFrontendRule,
}
instances, err := provider.listInstances(ctx, client)
instances, err := p.listInstances(ctx, client)
if err != nil {
return nil, err
}
instances = fun.Filter(provider.filterInstance, instances).([]ecsInstance)
instances = fun.Filter(p.filterInstance, instances).([]ecsInstance)
return provider.GetConfiguration("templates/ecs.tmpl", ecsFuncMap, struct {
return p.GetConfiguration("templates/ecs.tmpl", ecsFuncMap, struct {
Instances []ecsInstance
}{
instances,
})
}
// Find all running ECS tasks in a cluster, also collect the task definitions (for docker labels)
// Find all running Provider tasks in a cluster, also collect the task definitions (for docker labels)
// and the EC2 instance data
func (provider *ECS) listInstances(ctx context.Context, client *awsClient) ([]ecsInstance, error) {
func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsInstance, error) {
var taskArns []*string
req, _ := client.ecs.ListTasksRequest(&ecs.ListTasksInput{
Cluster: &provider.Cluster,
Cluster: &p.Cluster,
DesiredStatus: aws.String(ecs.DesiredStatusRunning),
})
@ -214,13 +215,13 @@ func (provider *ECS) listInstances(ctx context.Context, client *awsClient) ([]ec
return []ecsInstance{}, nil
}
chunkedTaskArns := provider.chunkedTaskArns(taskArns)
chunkedTaskArns := p.chunkedTaskArns(taskArns)
var tasks []*ecs.Task
for _, arns := range chunkedTaskArns {
req, taskResp := client.ecs.DescribeTasksRequest(&ecs.DescribeTasksInput{
Tasks: arns,
Cluster: &provider.Cluster,
Cluster: &p.Cluster,
})
if err := wrapAws(ctx, req); err != nil {
@ -247,12 +248,12 @@ func (provider *ECS) listInstances(ctx context.Context, client *awsClient) ([]ec
}
}
machines, err := provider.lookupEc2Instances(ctx, client, containerInstanceArns)
machines, err := p.lookupEc2Instances(ctx, client, containerInstanceArns)
if err != nil {
return nil, err
}
taskDefinitions, err := provider.lookupTaskDefinitions(ctx, client, taskDefinitionArns)
taskDefinitions, err := p.lookupTaskDefinitions(ctx, client, taskDefinitionArns)
if err != nil {
return nil, err
}
@ -289,7 +290,7 @@ func (provider *ECS) listInstances(ctx context.Context, client *awsClient) ([]ec
return instances, nil
}
func (provider *ECS) lookupEc2Instances(ctx context.Context, client *awsClient, containerArns []*string) ([]*ec2.Instance, error) {
func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, containerArns []*string) ([]*ec2.Instance, error) {
order := make(map[string]int)
instanceIds := make([]*string, len(containerArns))
@ -300,7 +301,7 @@ func (provider *ECS) lookupEc2Instances(ctx context.Context, client *awsClient,
req, _ := client.ecs.DescribeContainerInstancesRequest(&ecs.DescribeContainerInstancesInput{
ContainerInstances: containerArns,
Cluster: &provider.Cluster,
Cluster: &p.Cluster,
})
for ; req != nil; req = req.NextPage() {
@ -336,7 +337,7 @@ func (provider *ECS) lookupEc2Instances(ctx context.Context, client *awsClient,
return instances, nil
}
func (provider *ECS) lookupTaskDefinitions(ctx context.Context, client *awsClient, taskDefArns []*string) ([]*ecs.TaskDefinition, error) {
func (p *Provider) lookupTaskDefinitions(ctx context.Context, client *awsClient, taskDefArns []*string) ([]*ecs.TaskDefinition, error) {
taskDefinitions := make([]*ecs.TaskDefinition, len(taskDefArns))
for i, arn := range taskDefArns {
@ -360,7 +361,7 @@ func (i ecsInstance) label(k string) string {
return ""
}
func (provider *ECS) filterInstance(i ecsInstance) bool {
func (p *Provider) filterInstance(i ecsInstance) bool {
if len(i.container.NetworkBindings) == 0 {
log.Debugf("Filtering ecs instance without port %s (%s)", i.Name, i.ID)
return false
@ -384,7 +385,7 @@ func (provider *ECS) filterInstance(i ecsInstance) bool {
}
label := i.label("traefik.enable")
enabled := provider.ExposedByDefault && label != "false" || label == "true"
enabled := p.ExposedByDefault && label != "false" || label == "true"
if !enabled {
log.Debugf("Filtering disabled ecs instance %s (%s) (traefik.enabled = '%s')", i.Name, i.ID, label)
return false
@ -393,7 +394,7 @@ func (provider *ECS) filterInstance(i ecsInstance) bool {
return true
}
func (provider *ECS) filterFrontends(instances []ecsInstance) []ecsInstance {
func (p *Provider) filterFrontends(instances []ecsInstance) []ecsInstance {
byName := make(map[string]bool)
return fun.Filter(func(i ecsInstance) bool {
@ -406,16 +407,16 @@ func (provider *ECS) filterFrontends(instances []ecsInstance) []ecsInstance {
}, instances).([]ecsInstance)
}
func (provider *ECS) getFrontendRule(i ecsInstance) string {
func (p *Provider) getFrontendRule(i ecsInstance) string {
if label := i.label("traefik.frontend.rule"); label != "" {
return label
}
return "Host:" + strings.ToLower(strings.Replace(i.Name, "_", "-", -1)) + "." + provider.Domain
return "Host:" + strings.ToLower(strings.Replace(i.Name, "_", "-", -1)) + "." + p.Domain
}
// ECS expects no more than 100 parameters be passed to a DescribeTask call; thus, pack
// Provider expects no more than 100 parameters be passed to a DescribeTask call; thus, pack
// each string into an array capped at 100 elements
func (provider *ECS) chunkedTaskArns(tasks []*string) [][]*string {
func (p *Provider) chunkedTaskArns(tasks []*string) [][]*string {
var chunkedTasks [][]*string
for i := 0; i < len(tasks); i += 100 {
sliceEnd := -1

View file

@ -1,4 +1,4 @@
package provider
package ecs
import (
"reflect"
@ -299,7 +299,7 @@ func TestFilterInstance(t *testing.T) {
}
for i, c := range cases {
provider := &ECS{
provider := &Provider{
ExposedByDefault: c.exposedByDefault,
}
value := provider.filterInstance(c.instanceInfo)
@ -310,7 +310,7 @@ func TestFilterInstance(t *testing.T) {
}
func TestTaskChunking(t *testing.T) {
provider := &ECS{}
provider := &Provider{}
testval := "a"
cases := []struct {

View file

@ -1,35 +0,0 @@
package provider
import (
"fmt"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/etcd"
)
var _ Provider = (*Etcd)(nil)
// Etcd holds configurations of the Etcd provider.
type Etcd struct {
Kv `mapstructure:",squash"`
}
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *Etcd) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
store, err := provider.CreateStore()
if err != nil {
return fmt.Errorf("Failed to Connect to KV store: %v", err)
}
provider.kvclient = store
return provider.provide(configurationChan, pool, constraints)
}
// CreateStore creates the KV store
func (provider *Etcd) CreateStore() (store.Store, error) {
provider.storeType = store.ETCD
etcd.Register()
return provider.createStore()
}

37
provider/etcd/etcd.go Normal file
View file

@ -0,0 +1,37 @@
package etcd
import (
"fmt"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/provider/kv"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/etcd"
)
var _ provider.Provider = (*Provider)(nil)
// Provider holds configurations of the provider.
type Provider struct {
kv.Provider `mapstructure:",squash"`
}
// Provide allows the etcd provider to Provide configurations to traefik
// using the given configuration channel.
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
store, err := p.CreateStore()
if err != nil {
return fmt.Errorf("Failed to Connect to KV store: %v", err)
}
p.Kvclient = store
return p.Provider.Provide(configurationChan, pool, constraints)
}
// CreateStore creates the KV store
func (p *Provider) CreateStore() (store.Store, error) {
p.StoreType = store.ETCD
etcd.Register()
return p.Provider.CreateStore()
}

View file

@ -1,4 +1,4 @@
package provider
package eureka
import (
"io/ioutil"
@ -11,25 +11,26 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
)
// Eureka holds configuration of the Eureka provider.
type Eureka struct {
BaseProvider `mapstructure:",squash"`
Endpoint string
Delay string
// Provider holds configuration of the Provider provider.
type Provider struct {
provider.BaseProvider `mapstructure:",squash"`
Endpoint string
Delay string
}
// Provide allows the provider to provide configurations to traefik
// Provide allows the eureka provider to provide configurations to traefik
// using the given configuration channel.
func (provider *Eureka) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, _ types.Constraints) error {
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, _ types.Constraints) error {
operation := func() error {
configuration, err := provider.buildConfiguration()
configuration, err := p.buildConfiguration()
if err != nil {
log.Errorf("Failed to build configuration for Eureka, error: %s", err)
log.Errorf("Failed to build configuration for Provider, error: %s", err)
return err
}
@ -39,11 +40,11 @@ func (provider *Eureka) Provide(configurationChan chan<- types.ConfigMessage, po
}
var delay time.Duration
if len(provider.Delay) > 0 {
if len(p.Delay) > 0 {
var err error
delay, err = time.ParseDuration(provider.Delay)
delay, err = time.ParseDuration(p.Delay)
if err != nil {
log.Errorf("Failed to parse delay for Eureka, error: %s", err)
log.Errorf("Failed to parse delay for Provider, error: %s", err)
return err
}
} else {
@ -54,11 +55,11 @@ func (provider *Eureka) Provide(configurationChan chan<- types.ConfigMessage, po
go func() {
for t := range ticker.C {
log.Debug("Refreshing Eureka " + t.String())
log.Debug("Refreshing Provider " + t.String())
configuration, err := provider.buildConfiguration()
configuration, err := p.buildConfiguration()
if err != nil {
log.Errorf("Failed to refresh Eureka configuration, error: %s", err)
log.Errorf("Failed to refresh Provider configuration, error: %s", err)
return
}
@ -72,29 +73,29 @@ func (provider *Eureka) Provide(configurationChan chan<- types.ConfigMessage, po
}
notify := func(err error, time time.Duration) {
log.Errorf("Eureka connection error %+v, retrying in %s", err, time)
log.Errorf("Provider connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Cannot connect to Eureka server %+v", err)
log.Errorf("Cannot connect to Provider server %+v", err)
return err
}
return nil
}
// Build the configuration from Eureka server
func (provider *Eureka) buildConfiguration() (*types.Configuration, error) {
// Build the configuration from Provider server
func (p *Provider) buildConfiguration() (*types.Configuration, error) {
var EurekaFuncMap = template.FuncMap{
"getPort": provider.getPort,
"getProtocol": provider.getProtocol,
"getWeight": provider.getWeight,
"getInstanceID": provider.getInstanceID,
"getPort": p.getPort,
"getProtocol": p.getProtocol,
"getWeight": p.getWeight,
"getInstanceID": p.getInstanceID,
}
eureka.GetLogger().SetOutput(ioutil.Discard)
client := eureka.NewClient([]string{
provider.Endpoint,
p.Endpoint,
})
applications, err := client.GetApplications()
@ -108,37 +109,37 @@ func (provider *Eureka) buildConfiguration() (*types.Configuration, error) {
applications.Applications,
}
configuration, err := provider.GetConfiguration("templates/eureka.tmpl", EurekaFuncMap, templateObjects)
configuration, err := p.GetConfiguration("templates/eureka.tmpl", EurekaFuncMap, templateObjects)
if err != nil {
log.Error(err)
}
return configuration, nil
}
func (provider *Eureka) getPort(instance eureka.InstanceInfo) string {
func (p *Provider) getPort(instance eureka.InstanceInfo) string {
if instance.SecurePort.Enabled {
return strconv.Itoa(instance.SecurePort.Port)
}
return strconv.Itoa(instance.Port.Port)
}
func (provider *Eureka) getProtocol(instance eureka.InstanceInfo) string {
func (p *Provider) getProtocol(instance eureka.InstanceInfo) string {
if instance.SecurePort.Enabled {
return "https"
}
return "http"
}
func (provider *Eureka) getWeight(instance eureka.InstanceInfo) string {
func (p *Provider) getWeight(instance eureka.InstanceInfo) string {
if val, ok := instance.Metadata.Map["traefik.weight"]; ok {
return val
}
return "0"
}
func (provider *Eureka) getInstanceID(instance eureka.InstanceInfo) string {
func (p *Provider) getInstanceID(instance eureka.InstanceInfo) string {
if val, ok := instance.Metadata.Map["traefik.backend.id"]; ok {
return val
}
return strings.Replace(instance.IpAddr, ".", "-", -1) + "-" + provider.getPort(instance)
return strings.Replace(instance.IpAddr, ".", "-", -1) + "-" + p.getPort(instance)
}

View file

@ -1,4 +1,4 @@
package provider
package eureka
import (
"testing"
@ -35,7 +35,7 @@ func TestEurekaGetPort(t *testing.T) {
},
}
eurekaProvider := &Eureka{}
eurekaProvider := &Provider{}
for _, c := range cases {
port := eurekaProvider.getPort(c.instanceInfo)
if port != c.expectedPort {
@ -73,7 +73,7 @@ func TestEurekaGetProtocol(t *testing.T) {
},
}
eurekaProvider := &Eureka{}
eurekaProvider := &Provider{}
for _, c := range cases {
protocol := eurekaProvider.getProtocol(c.instanceInfo)
if protocol != c.expectedProtocol {
@ -113,7 +113,7 @@ func TestEurekaGetWeight(t *testing.T) {
},
}
eurekaProvider := &Eureka{}
eurekaProvider := &Provider{}
for _, c := range cases {
weight := eurekaProvider.getWeight(c.instanceInfo)
if weight != c.expectedWeight {
@ -161,7 +161,7 @@ func TestEurekaGetInstanceId(t *testing.T) {
},
}
eurekaProvider := &Eureka{}
eurekaProvider := &Provider{}
for _, c := range cases {
id := eurekaProvider.getInstanceID(c.instanceInfo)
if id != c.expectedID {

View file

@ -1,4 +1,4 @@
package provider
package file
import (
"os"
@ -7,35 +7,36 @@ import (
"github.com/BurntSushi/toml"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"gopkg.in/fsnotify.v1"
)
var _ Provider = (*File)(nil)
var _ provider.Provider = (*Provider)(nil)
// File holds configurations of the File provider.
type File struct {
BaseProvider `mapstructure:",squash"`
// Provider holds configurations of the provider.
type Provider struct {
provider.BaseProvider `mapstructure:",squash"`
}
// Provide allows the provider to provide configurations to traefik
// Provide allows the file provider to provide configurations to traefik
// using the given configuration channel.
func (provider *File) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Error("Error creating file watcher", err)
return err
}
file, err := os.Open(provider.Filename)
file, err := os.Open(p.Filename)
if err != nil {
log.Error("Error opening file", err)
return err
}
defer file.Close()
if provider.Watch {
if p.Watch {
// Process events
pool.Go(func(stop chan bool) {
defer watcher.Close()
@ -45,8 +46,8 @@ func (provider *File) Provide(configurationChan chan<- types.ConfigMessage, pool
return
case event := <-watcher.Events:
if strings.Contains(event.Name, file.Name()) {
log.Debug("File event:", event)
configuration := provider.loadFileConfig(file.Name())
log.Debug("Provider event:", event)
configuration := p.loadFileConfig(file.Name())
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: "file",
@ -66,7 +67,7 @@ func (provider *File) Provide(configurationChan chan<- types.ConfigMessage, pool
}
}
configuration := provider.loadFileConfig(file.Name())
configuration := p.loadFileConfig(file.Name())
configurationChan <- types.ConfigMessage{
ProviderName: "file",
Configuration: configuration,
@ -74,7 +75,7 @@ func (provider *File) Provide(configurationChan chan<- types.ConfigMessage, pool
return nil
}
func (provider *File) loadFileConfig(filename string) *types.Configuration {
func (p *Provider) loadFileConfig(filename string) *types.Configuration {
configuration := new(types.Configuration)
if _, err := toml.DecodeFile(filename, configuration); err != nil {
log.Error("Error reading file:", err)

View file

@ -1 +0,0 @@
package provider

View file

@ -1,4 +1,4 @@
package k8s
package kubernetes
import (
"errors"
@ -20,8 +20,8 @@ import (
const resyncPeriod = time.Minute * 5
// Client is a client for the Kubernetes master.
// WatchAll starts the watch of the Kubernetes ressources and updates the stores.
// Client is a client for the Provider master.
// WatchAll starts the watch of the Provider ressources and updates the stores.
// The stores can then be accessed via the Get* functions.
type Client interface {
GetIngresses(namespaces Namespaces) []*v1beta1.Ingress
@ -42,7 +42,7 @@ type clientImpl struct {
clientset *kubernetes.Clientset
}
// NewInClusterClient returns a new Kubernetes client that is expected to run
// NewInClusterClient returns a new Provider client that is expected to run
// inside the cluster.
func NewInClusterClient(endpoint string) (Client, error) {
config, err := rest.InClusterConfig()
@ -57,7 +57,7 @@ func NewInClusterClient(endpoint string) (Client, error) {
return createClientFromConfig(config)
}
// NewExternalClusterClient returns a new Kubernetes client that may run outside
// NewExternalClusterClient returns a new Provider client that may run outside
// of the cluster.
// The endpoint parameter must not be empty.
func NewExternalClusterClient(endpoint, token, caFilePath string) (Client, error) {
@ -108,7 +108,7 @@ func (c *clientImpl) GetIngresses(namespaces Namespaces) []*v1beta1.Ingress {
return result
}
// WatchIngresses starts the watch of Kubernetes Ingresses resources and updates the corresponding store
// WatchIngresses starts the watch of Provider Ingresses resources and updates the corresponding store
func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, watchCh chan<- interface{}, stopCh <-chan struct{}) {
source := NewListWatchFromClient(
c.clientset.ExtensionsV1beta1().RESTClient(),
@ -154,7 +154,7 @@ func (c *clientImpl) GetService(namespace, name string) (*v1.Service, bool, erro
return service, exists, err
}
// WatchServices starts the watch of Kubernetes Service resources and updates the corresponding store
// WatchServices starts the watch of Provider Service resources and updates the corresponding store
func (c *clientImpl) WatchServices(watchCh chan<- interface{}, stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
c.clientset.CoreV1().RESTClient(),
@ -183,7 +183,7 @@ func (c *clientImpl) GetEndpoints(namespace, name string) (*v1.Endpoints, bool,
return endpoint, exists, err
}
// WatchEndpoints starts the watch of Kubernetes Endpoints resources and updates the corresponding store
// WatchEndpoints starts the watch of Provider Endpoints resources and updates the corresponding store
func (c *clientImpl) WatchEndpoints(watchCh chan<- interface{}, stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
c.clientset.CoreV1().RESTClient(),

View file

@ -1,4 +1,4 @@
package provider
package kubernetes
import (
"fmt"
@ -12,14 +12,14 @@ import (
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider/k8s"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/util/intstr"
)
var _ Provider = (*Kubernetes)(nil)
var _ provider.Provider = (*Provider)(nil)
const (
annotationFrontendRuleType = "traefik.frontend.rule.type"
@ -29,49 +29,49 @@ const (
ruleTypePathPrefix = "PathPrefix"
)
// Kubernetes holds configurations of the Kubernetes provider.
type Kubernetes struct {
BaseProvider `mapstructure:",squash"`
Endpoint string `description:"Kubernetes server endpoint (required for external cluster client)"`
Token string `description:"Kubernetes bearer token (not needed for in-cluster client)"`
CertAuthFilePath string `description:"Kubernetes certificate authority file path (not needed for in-cluster client)"`
DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers"`
Namespaces k8s.Namespaces `description:"Kubernetes namespaces"`
LabelSelector string `description:"Kubernetes api label selector to use"`
// Provider holds configurations of the provider.
type Provider struct {
provider.BaseProvider `mapstructure:",squash"`
Endpoint string `description:"Kubernetes server endpoint (required for external cluster client)"`
Token string `description:"Kubernetes bearer token (not needed for in-cluster client)"`
CertAuthFilePath string `description:"Kubernetes certificate authority file path (not needed for in-cluster client)"`
DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers"`
Namespaces Namespaces `description:"Kubernetes namespaces"`
LabelSelector string `description:"Kubernetes api label selector to use"`
lastConfiguration safe.Safe
}
func (provider *Kubernetes) newK8sClient() (k8s.Client, error) {
func (p *Provider) newK8sClient() (Client, error) {
withEndpoint := ""
if provider.Endpoint != "" {
withEndpoint = fmt.Sprintf(" with endpoint %v", provider.Endpoint)
if p.Endpoint != "" {
withEndpoint = fmt.Sprintf(" with endpoint %v", p.Endpoint)
}
if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" {
log.Infof("Creating in-cluster Kubernetes client%s\n", withEndpoint)
return k8s.NewInClusterClient(provider.Endpoint)
log.Infof("Creating in-cluster Provider client%s\n", withEndpoint)
return NewInClusterClient(p.Endpoint)
}
log.Infof("Creating cluster-external Kubernetes client%s\n", withEndpoint)
return k8s.NewExternalClusterClient(provider.Endpoint, provider.Token, provider.CertAuthFilePath)
log.Infof("Creating cluster-external Provider client%s\n", withEndpoint)
return NewExternalClusterClient(p.Endpoint, p.Token, p.CertAuthFilePath)
}
// Provide allows the provider to provide configurations to traefik
// Provide allows the k8s provider to provide configurations to traefik
// using the given configuration channel.
func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
k8sClient, err := provider.newK8sClient()
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
k8sClient, err := p.newK8sClient()
if err != nil {
return err
}
provider.Constraints = append(provider.Constraints, constraints...)
p.Constraints = append(p.Constraints, constraints...)
pool.Go(func(stop chan bool) {
operation := func() error {
for {
stopWatch := make(chan struct{}, 1)
defer close(stopWatch)
log.Debugf("Using label selector: '%s'", provider.LabelSelector)
eventsChan, err := k8sClient.WatchAll(provider.LabelSelector, stopWatch)
log.Debugf("Using label selector: '%s'", p.LabelSelector)
eventsChan, err := k8sClient.WatchAll(p.LabelSelector, stopWatch)
if err != nil {
log.Errorf("Error watching kubernetes events: %v", err)
timer := time.NewTimer(1 * time.Second)
@ -88,17 +88,17 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
return nil
case event := <-eventsChan:
log.Debugf("Received event from kubernetes %+v", event)
templateObjects, err := provider.loadIngresses(k8sClient)
templateObjects, err := p.loadIngresses(k8sClient)
if err != nil {
return err
}
if reflect.DeepEqual(provider.lastConfiguration.Get(), templateObjects) {
if reflect.DeepEqual(p.lastConfiguration.Get(), templateObjects) {
log.Debugf("Skipping event from kubernetes %+v", event)
} else {
provider.lastConfiguration.Set(templateObjects)
p.lastConfiguration.Set(templateObjects)
configurationChan <- types.ConfigMessage{
ProviderName: "kubernetes",
Configuration: provider.loadConfig(*templateObjects),
Configuration: p.loadConfig(*templateObjects),
}
}
}
@ -107,19 +107,19 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
}
notify := func(err error, time time.Duration) {
log.Errorf("Kubernetes connection error %+v, retrying in %s", err, time)
log.Errorf("Provider connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Cannot connect to Kubernetes server %+v", err)
log.Errorf("Cannot connect to Provider server %+v", err)
}
})
return nil
}
func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configuration, error) {
ingresses := k8sClient.GetIngresses(provider.Namespaces)
func (p *Provider) loadIngresses(k8sClient Client) (*types.Configuration, error) {
ingresses := k8sClient.GetIngresses(p.Namespaces)
templateObjects := types.Configuration{
map[string]*types.Backend{},
@ -148,7 +148,7 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur
}
}
PassHostHeader := provider.getPassHostHeader()
PassHostHeader := p.getPassHostHeader()
passHostHeaderAnnotation := i.Annotations["traefik.frontend.passHostHeader"]
switch passHostHeaderAnnotation {
@ -311,16 +311,16 @@ func shouldProcessIngress(ingressClass string) bool {
}
}
func (provider *Kubernetes) getPassHostHeader() bool {
if provider.DisablePassHostHeaders {
func (p *Provider) getPassHostHeader() bool {
if p.DisablePassHostHeaders {
return false
}
return true
}
func (provider *Kubernetes) loadConfig(templateObjects types.Configuration) *types.Configuration {
func (p *Provider) loadConfig(templateObjects types.Configuration) *types.Configuration {
var FuncMap = template.FuncMap{}
configuration, err := provider.GetConfiguration("templates/kubernetes.tmpl", FuncMap, templateObjects)
configuration, err := p.GetConfiguration("templates/kubernetes.tmpl", FuncMap, templateObjects)
if err != nil {
log.Error(err)
}

View file

@ -1,4 +1,4 @@
package provider
package kubernetes
import (
"encoding/json"
@ -8,7 +8,6 @@ import (
"strings"
"testing"
"github.com/containous/traefik/provider/k8s"
"github.com/containous/traefik/types"
"github.com/davecgh/go-spew/spew"
"k8s.io/client-go/pkg/api/v1"
@ -225,7 +224,7 @@ func TestLoadIngresses(t *testing.T) {
endpoints: endpoints,
watchChan: watchChan,
}
provider := Kubernetes{}
provider := Provider{}
actual, err := provider.loadIngresses(client)
if err != nil {
t.Fatalf("error %+v", err)
@ -409,7 +408,7 @@ func TestRuleType(t *testing.T) {
services: []*v1.Service{service},
watchChan: watchChan,
}
provider := Kubernetes{DisablePassHostHeaders: true}
provider := Provider{DisablePassHostHeaders: true}
actualConfig, err := provider.loadIngresses(client)
if err != nil {
t.Fatalf("error loading ingresses: %+v", err)
@ -490,7 +489,7 @@ func TestGetPassHostHeader(t *testing.T) {
services: services,
watchChan: watchChan,
}
provider := Kubernetes{DisablePassHostHeaders: true}
provider := Provider{DisablePassHostHeaders: true}
actual, err := provider.loadIngresses(client)
if err != nil {
t.Fatalf("error %+v", err)
@ -602,7 +601,7 @@ func TestOnlyReferencesServicesFromOwnNamespace(t *testing.T) {
services: services,
watchChan: watchChan,
}
provider := Kubernetes{}
provider := Provider{}
actual, err := provider.loadIngresses(client)
if err != nil {
t.Fatalf("error %+v", err)
@ -789,7 +788,7 @@ func TestLoadNamespacedIngresses(t *testing.T) {
services: services,
watchChan: watchChan,
}
provider := Kubernetes{
provider := Provider{
Namespaces: []string{"awesome"},
}
actual, err := provider.loadIngresses(client)
@ -1033,7 +1032,7 @@ func TestLoadMultipleNamespacedIngresses(t *testing.T) {
services: services,
watchChan: watchChan,
}
provider := Kubernetes{
provider := Provider{
Namespaces: []string{"awesome", "somewhat-awesome"},
}
actual, err := provider.loadIngresses(client)
@ -1182,7 +1181,7 @@ func TestHostlessIngress(t *testing.T) {
services: services,
watchChan: watchChan,
}
provider := Kubernetes{DisablePassHostHeaders: true}
provider := Provider{DisablePassHostHeaders: true}
actual, err := provider.loadIngresses(client)
if err != nil {
t.Fatalf("error %+v", err)
@ -1382,7 +1381,7 @@ func TestServiceAnnotations(t *testing.T) {
endpoints: endpoints,
watchChan: watchChan,
}
provider := Kubernetes{}
provider := Provider{}
actual, err := provider.loadIngresses(client)
if err != nil {
t.Fatalf("error %+v", err)
@ -1577,7 +1576,7 @@ func TestIngressAnnotations(t *testing.T) {
endpoints: endpoints,
watchChan: watchChan,
}
provider := Kubernetes{}
provider := Provider{}
actual, err := provider.loadIngresses(client)
if err != nil {
t.Fatalf("error %+v", err)
@ -1710,7 +1709,7 @@ func TestInvalidPassHostHeaderValue(t *testing.T) {
endpoints: endpoints,
watchChan: watchChan,
}
provider := Kubernetes{}
provider := Provider{}
actual, err := provider.loadIngresses(client)
if err != nil {
t.Fatalf("error %+v", err)
@ -1892,7 +1891,7 @@ func TestKubeAPIErrors(t *testing.T) {
apiEndpointsError: tc.apiEndpointsErr,
}
provider := Kubernetes{}
provider := Provider{}
if _, err := provider.loadIngresses(client); err != apiErr {
t.Errorf("Got error %v, wanted error %v", err, apiErr)
}
@ -2022,7 +2021,7 @@ func TestMissingResources(t *testing.T) {
// See https://github.com/containous/traefik/issues/1307
properExists: true,
}
provider := Kubernetes{}
provider := Provider{}
actual, err := provider.loadIngresses(client)
if err != nil {
t.Fatalf("error %+v", err)
@ -2098,11 +2097,11 @@ type clientMock struct {
properExists bool
}
func (c clientMock) GetIngresses(namespaces k8s.Namespaces) []*v1beta1.Ingress {
func (c clientMock) GetIngresses(namespaces Namespaces) []*v1beta1.Ingress {
result := make([]*v1beta1.Ingress, 0, len(c.ingresses))
for _, ingress := range c.ingresses {
if k8s.HasNamespace(ingress, namespaces) {
if HasNamespace(ingress, namespaces) {
result = append(result, ingress)
}
}

View file

@ -1,4 +1,4 @@
package k8s
package kubernetes
import (
"fmt"

View file

@ -1,5 +1,4 @@
// Package provider holds the different provider implementation.
package provider
package kv
import (
"errors"
@ -12,45 +11,47 @@ import (
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
)
// Kv holds common configurations of key-value providers.
type Kv struct {
BaseProvider `mapstructure:",squash"`
Endpoint string `description:"Comma sepparated server endpoints"`
Prefix string `description:"Prefix used for KV store"`
TLS *ClientTLS `description:"Enable TLS support"`
storeType store.Backend
kvclient store.Store
// Provider holds common configurations of key-value providers.
type Provider struct {
provider.BaseProvider `mapstructure:",squash"`
Endpoint string `description:"Comma sepparated server endpoints"`
Prefix string `description:"Prefix used for KV store"`
TLS *provider.ClientTLS `description:"Enable TLS support"`
StoreType store.Backend
Kvclient store.Store
}
func (provider *Kv) createStore() (store.Store, error) {
// CreateStore create the K/V store
func (p *Provider) CreateStore() (store.Store, error) {
storeConfig := &store.Config{
ConnectionTimeout: 30 * time.Second,
Bucket: "traefik",
}
if provider.TLS != nil {
if p.TLS != nil {
var err error
storeConfig.TLS, err = provider.TLS.CreateTLSConfig()
storeConfig.TLS, err = p.TLS.CreateTLSConfig()
if err != nil {
return nil, err
}
}
return libkv.NewStore(
provider.storeType,
strings.Split(provider.Endpoint, ","),
p.StoreType,
strings.Split(p.Endpoint, ","),
storeConfig,
)
}
func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix string, stop chan bool) error {
func (p *Provider) watchKv(configurationChan chan<- types.ConfigMessage, prefix string, stop chan bool) error {
operation := func() error {
events, err := provider.kvclient.WatchTree(provider.Prefix, make(chan struct{}))
events, err := p.Kvclient.WatchTree(p.Prefix, make(chan struct{}))
if err != nil {
return fmt.Errorf("Failed to KV WatchTree: %v", err)
}
@ -62,10 +63,10 @@ func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix
if !ok {
return errors.New("watchtree channel closed")
}
configuration := provider.loadConfig()
configuration := p.loadConfig()
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: string(provider.storeType),
ProviderName: string(p.StoreType),
Configuration: configuration,
}
}
@ -83,23 +84,24 @@ func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix
return nil
}
func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
provider.Constraints = append(provider.Constraints, constraints...)
// Provide provides the configuration to traefik via the configuration channel
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
p.Constraints = append(p.Constraints, constraints...)
operation := func() error {
if _, err := provider.kvclient.Exists("qmslkjdfmqlskdjfmqlksjazçueznbvbwzlkajzebvkwjdcqmlsfj"); err != nil {
if _, err := p.Kvclient.Exists("qmslkjdfmqlskdjfmqlksjazçueznbvbwzlkajzebvkwjdcqmlsfj"); err != nil {
return fmt.Errorf("Failed to test KV store connection: %v", err)
}
if provider.Watch {
if p.Watch {
pool.Go(func(stop chan bool) {
err := provider.watchKv(configurationChan, provider.Prefix, stop)
err := p.watchKv(configurationChan, p.Prefix, stop)
if err != nil {
log.Errorf("Cannot watch KV store: %v", err)
}
})
}
configuration := provider.loadConfig()
configuration := p.loadConfig()
configurationChan <- types.ConfigMessage{
ProviderName: string(provider.storeType),
ProviderName: string(p.StoreType),
Configuration: configuration,
}
return nil
@ -114,23 +116,23 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *
return nil
}
func (provider *Kv) loadConfig() *types.Configuration {
func (p *Provider) loadConfig() *types.Configuration {
templateObjects := struct {
Prefix string
}{
// Allow `/traefik/alias` to superesede `provider.Prefix`
strings.TrimSuffix(provider.get(provider.Prefix, provider.Prefix+"/alias"), "/"),
// Allow `/traefik/alias` to superesede `p.Prefix`
strings.TrimSuffix(p.get(p.Prefix, p.Prefix+"/alias"), "/"),
}
var KvFuncMap = template.FuncMap{
"List": provider.list,
"ListServers": provider.listServers,
"Get": provider.get,
"SplitGet": provider.splitGet,
"Last": provider.last,
"List": p.list,
"ListServers": p.listServers,
"Get": p.get,
"SplitGet": p.splitGet,
"Last": p.last,
}
configuration, err := provider.GetConfiguration("templates/kv.tmpl", KvFuncMap, templateObjects)
configuration, err := p.GetConfiguration("templates/kv.tmpl", KvFuncMap, templateObjects)
if err != nil {
log.Error(err)
}
@ -144,9 +146,9 @@ func (provider *Kv) loadConfig() *types.Configuration {
return configuration
}
func (provider *Kv) list(keys ...string) []string {
func (p *Provider) list(keys ...string) []string {
joinedKeys := strings.Join(keys, "")
keysPairs, err := provider.kvclient.List(joinedKeys)
keysPairs, err := p.Kvclient.List(joinedKeys)
if err != nil {
log.Debugf("Cannot get keys %s %s ", joinedKeys, err)
return nil
@ -159,23 +161,23 @@ func (provider *Kv) list(keys ...string) []string {
return fun.Values(directoryKeys).([]string)
}
func (provider *Kv) listServers(backend string) []string {
serverNames := provider.list(backend, "/servers/")
func (p *Provider) listServers(backend string) []string {
serverNames := p.list(backend, "/servers/")
return fun.Filter(func(serverName string) bool {
key := fmt.Sprint(serverName, "/url")
if _, err := provider.kvclient.Get(key); err != nil {
if _, err := p.Kvclient.Get(key); err != nil {
if err != store.ErrKeyNotFound {
log.Errorf("Failed to retrieve value for key %s: %s", key, err)
}
return false
}
return provider.checkConstraints(serverName, "/tags")
return p.checkConstraints(serverName, "/tags")
}, serverNames).([]string)
}
func (provider *Kv) get(defaultValue string, keys ...string) string {
func (p *Provider) get(defaultValue string, keys ...string) string {
joinedKeys := strings.Join(keys, "")
keyPair, err := provider.kvclient.Get(strings.TrimPrefix(joinedKeys, "/"))
keyPair, err := p.Kvclient.Get(strings.TrimPrefix(joinedKeys, "/"))
if err != nil {
log.Debugf("Cannot get key %s %s, setting default %s", joinedKeys, err, defaultValue)
return defaultValue
@ -186,9 +188,9 @@ func (provider *Kv) get(defaultValue string, keys ...string) string {
return string(keyPair.Value)
}
func (provider *Kv) splitGet(keys ...string) []string {
func (p *Provider) splitGet(keys ...string) []string {
joinedKeys := strings.Join(keys, "")
keyPair, err := provider.kvclient.Get(joinedKeys)
keyPair, err := p.Kvclient.Get(joinedKeys)
if err != nil {
log.Debugf("Cannot get key %s %s, setting default empty", joinedKeys, err)
return []string{}
@ -199,14 +201,14 @@ func (provider *Kv) splitGet(keys ...string) []string {
return strings.Split(string(keyPair.Value), ",")
}
func (provider *Kv) last(key string) string {
func (p *Provider) last(key string) string {
splittedKey := strings.Split(key, "/")
return splittedKey[len(splittedKey)-1]
}
func (provider *Kv) checkConstraints(keys ...string) bool {
func (p *Provider) checkConstraints(keys ...string) bool {
joinedKeys := strings.Join(keys, "")
keyPair, err := provider.kvclient.Get(joinedKeys)
keyPair, err := p.Kvclient.Get(joinedKeys)
value := ""
if err == nil && keyPair != nil && keyPair.Value != nil {
@ -214,7 +216,7 @@ func (provider *Kv) checkConstraints(keys ...string) bool {
}
constraintTags := strings.Split(value, ",")
ok, failingConstraint := provider.MatchConstraints(constraintTags)
ok, failingConstraint := p.MatchConstraints(constraintTags)
if ok == false {
if failingConstraint != nil {
log.Debugf("Constraint %v not matching with following tags: %v", failingConstraint.String(), value)

View file

@ -1,4 +1,4 @@
package provider
package kv
import (
"errors"
@ -14,27 +14,27 @@ import (
func TestKvList(t *testing.T) {
cases := []struct {
provider *Kv
provider *Provider
keys []string
expected []string
}{
{
provider: &Kv{
kvclient: &Mock{},
provider: &Provider{
Kvclient: &Mock{},
},
keys: []string{},
expected: []string{},
},
{
provider: &Kv{
kvclient: &Mock{},
provider: &Provider{
Kvclient: &Mock{},
},
keys: []string{"traefik"},
expected: []string{},
},
{
provider: &Kv{
kvclient: &Mock{
provider: &Provider{
Kvclient: &Mock{
KVPairs: []*store.KVPair{
{
Key: "foo",
@ -47,8 +47,8 @@ func TestKvList(t *testing.T) {
expected: []string{},
},
{
provider: &Kv{
kvclient: &Mock{
provider: &Provider{
Kvclient: &Mock{
KVPairs: []*store.KVPair{
{
Key: "foo",
@ -61,8 +61,8 @@ func TestKvList(t *testing.T) {
expected: []string{"foo"},
},
{
provider: &Kv{
kvclient: &Mock{
provider: &Provider{
Kvclient: &Mock{
KVPairs: []*store.KVPair{
{
Key: "foo/baz/1",
@ -94,8 +94,8 @@ func TestKvList(t *testing.T) {
}
// Error case
provider := &Kv{
kvclient: &Mock{
provider := &Provider{
Kvclient: &Mock{
Error: KvError{
List: store.ErrKeyNotFound,
},
@ -109,27 +109,27 @@ func TestKvList(t *testing.T) {
func TestKvGet(t *testing.T) {
cases := []struct {
provider *Kv
provider *Provider
keys []string
expected string
}{
{
provider: &Kv{
kvclient: &Mock{},
provider: &Provider{
Kvclient: &Mock{},
},
keys: []string{},
expected: "",
},
{
provider: &Kv{
kvclient: &Mock{},
provider: &Provider{
Kvclient: &Mock{},
},
keys: []string{"traefik"},
expected: "",
},
{
provider: &Kv{
kvclient: &Mock{
provider: &Provider{
Kvclient: &Mock{
KVPairs: []*store.KVPair{
{
Key: "foo",
@ -142,8 +142,8 @@ func TestKvGet(t *testing.T) {
expected: "",
},
{
provider: &Kv{
kvclient: &Mock{
provider: &Provider{
Kvclient: &Mock{
KVPairs: []*store.KVPair{
{
Key: "foo",
@ -156,8 +156,8 @@ func TestKvGet(t *testing.T) {
expected: "bar",
},
{
provider: &Kv{
kvclient: &Mock{
provider: &Provider{
Kvclient: &Mock{
KVPairs: []*store.KVPair{
{
Key: "foo/baz/1",
@ -187,8 +187,8 @@ func TestKvGet(t *testing.T) {
}
// Error case
provider := &Kv{
kvclient: &Mock{
provider := &Provider{
Kvclient: &Mock{
Error: KvError{
Get: store.ErrKeyNotFound,
},
@ -228,7 +228,7 @@ func TestKvLast(t *testing.T) {
},
}
provider := &Kv{}
provider := &Provider{}
for _, c := range cases {
actual := provider.last(c.key)
if actual != c.expected {
@ -238,7 +238,7 @@ func TestKvLast(t *testing.T) {
}
type KvMock struct {
Kv
Provider
}
func (provider *KvMock) loadConfig() *types.Configuration {
@ -248,8 +248,8 @@ func (provider *KvMock) loadConfig() *types.Configuration {
func TestKvWatchTree(t *testing.T) {
returnedChans := make(chan chan []*store.KVPair)
provider := &KvMock{
Kv{
kvclient: &Mock{
Provider{
Kvclient: &Mock{
WatchTreeMethod: func() <-chan []*store.KVPair {
c := make(chan []*store.KVPair, 10)
returnedChans <- c
@ -376,9 +376,9 @@ func (s *Mock) Close() {
}
func TestKVLoadConfig(t *testing.T) {
provider := &Kv{
provider := &Provider{
Prefix: "traefik",
kvclient: &Mock{
Kvclient: &Mock{
KVPairs: []*store.KVPair{
{
Key: "traefik/frontends/frontend.with.dot",

View file

@ -1,4 +1,4 @@
package provider
package marathon
import (
"errors"
@ -16,31 +16,32 @@ import (
"github.com/containous/flaeg"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/gambol99/go-marathon"
)
var _ Provider = (*Marathon)(nil)
var _ provider.Provider = (*Provider)(nil)
// Marathon holds configuration of the Marathon provider.
type Marathon struct {
BaseProvider
Endpoint string `description:"Marathon server endpoint. You can also specify multiple endpoint for Marathon"`
Domain string `description:"Default domain used"`
ExposedByDefault bool `description:"Expose Marathon apps by default"`
GroupsAsSubDomains bool `description:"Convert Marathon groups to subdomains"`
DCOSToken string `description:"DCOSToken for DCOS environment, This will override the Authorization header"`
MarathonLBCompatibility bool `description:"Add compatibility with marathon-lb labels"`
TLS *ClientTLS `description:"Enable Docker TLS support"`
DialerTimeout flaeg.Duration `description:"Set a non-default connection timeout for Marathon"`
KeepAlive flaeg.Duration `description:"Set a non-default TCP Keep Alive time in seconds"`
Basic *MarathonBasic
// Provider holds configuration of the provider.
type Provider struct {
provider.BaseProvider
Endpoint string `description:"Marathon server endpoint. You can also specify multiple endpoint for Marathon"`
Domain string `description:"Default domain used"`
ExposedByDefault bool `description:"Expose Marathon apps by default"`
GroupsAsSubDomains bool `description:"Convert Marathon groups to subdomains"`
DCOSToken string `description:"DCOSToken for DCOS environment, This will override the Authorization header"`
MarathonLBCompatibility bool `description:"Add compatibility with marathon-lb labels"`
TLS *provider.ClientTLS `description:"Enable Docker TLS support"`
DialerTimeout flaeg.Duration `description:"Set a non-default connection timeout for Marathon"`
KeepAlive flaeg.Duration `description:"Set a non-default TCP Keep Alive time in seconds"`
Basic *Basic
marathonClient marathon.Marathon
}
// MarathonBasic holds basic authentication specific configurations
type MarathonBasic struct {
// Basic holds basic authentication specific configurations
type Basic struct {
HTTPBasicAuthUser string
HTTPBasicPassword string
}
@ -50,30 +51,30 @@ type lightMarathonClient interface {
Applications(url.Values) (*marathon.Applications, error)
}
// Provide allows the provider to provide configurations to traefik
// Provide allows the marathon provider to provide configurations to traefik
// using the given configuration channel.
func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
provider.Constraints = append(provider.Constraints, constraints...)
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
p.Constraints = append(p.Constraints, constraints...)
operation := func() error {
config := marathon.NewDefaultConfig()
config.URL = provider.Endpoint
config.URL = p.Endpoint
config.EventsTransport = marathon.EventsTransportSSE
if provider.Basic != nil {
config.HTTPBasicAuthUser = provider.Basic.HTTPBasicAuthUser
config.HTTPBasicPassword = provider.Basic.HTTPBasicPassword
if p.Basic != nil {
config.HTTPBasicAuthUser = p.Basic.HTTPBasicAuthUser
config.HTTPBasicPassword = p.Basic.HTTPBasicPassword
}
if len(provider.DCOSToken) > 0 {
config.DCOSToken = provider.DCOSToken
if len(p.DCOSToken) > 0 {
config.DCOSToken = p.DCOSToken
}
TLSConfig, err := provider.TLS.CreateTLSConfig()
TLSConfig, err := p.TLS.CreateTLSConfig()
if err != nil {
return err
}
config.HTTPClient = &http.Client{
Transport: &http.Transport{
DialContext: (&net.Dialer{
KeepAlive: time.Duration(provider.KeepAlive),
Timeout: time.Duration(provider.DialerTimeout),
KeepAlive: time.Duration(p.KeepAlive),
Timeout: time.Duration(p.DialerTimeout),
}).DialContext,
TLSClientConfig: TLSConfig,
},
@ -83,9 +84,9 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage,
log.Errorf("Failed to create a client for marathon, error: %s", err)
return err
}
provider.marathonClient = client
p.marathonClient = client
if provider.Watch {
if p.Watch {
update, err := client.AddEventsListener(marathon.EventIDApplications)
if err != nil {
log.Errorf("Failed to register for events, %s", err)
@ -98,8 +99,8 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage,
case <-stop:
return
case event := <-update:
log.Debug("Marathon event receveived", event)
configuration := provider.loadMarathonConfig()
log.Debug("Provider event receveived", event)
configuration := p.loadMarathonConfig()
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: "marathon",
@ -110,7 +111,7 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage,
}
})
}
configuration := provider.loadMarathonConfig()
configuration := p.loadMarathonConfig()
configurationChan <- types.ConfigMessage{
ProviderName: "marathon",
Configuration: configuration,
@ -119,45 +120,45 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage,
}
notify := func(err error, time time.Duration) {
log.Errorf("Marathon connection error %+v, retrying in %s", err, time)
log.Errorf("Provider connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Cannot connect to Marathon server %+v", err)
log.Errorf("Cannot connect to Provider server %+v", err)
}
return nil
}
func (provider *Marathon) loadMarathonConfig() *types.Configuration {
func (p *Provider) loadMarathonConfig() *types.Configuration {
var MarathonFuncMap = template.FuncMap{
"getBackend": provider.getBackend,
"getBackendServer": provider.getBackendServer,
"getPort": provider.getPort,
"getWeight": provider.getWeight,
"getDomain": provider.getDomain,
"getProtocol": provider.getProtocol,
"getPassHostHeader": provider.getPassHostHeader,
"getPriority": provider.getPriority,
"getEntryPoints": provider.getEntryPoints,
"getFrontendRule": provider.getFrontendRule,
"getFrontendBackend": provider.getFrontendBackend,
"hasCircuitBreakerLabels": provider.hasCircuitBreakerLabels,
"hasLoadBalancerLabels": provider.hasLoadBalancerLabels,
"hasMaxConnLabels": provider.hasMaxConnLabels,
"getMaxConnExtractorFunc": provider.getMaxConnExtractorFunc,
"getMaxConnAmount": provider.getMaxConnAmount,
"getLoadBalancerMethod": provider.getLoadBalancerMethod,
"getCircuitBreakerExpression": provider.getCircuitBreakerExpression,
"getSticky": provider.getSticky,
"getBackend": p.getBackend,
"getBackendServer": p.getBackendServer,
"getPort": p.getPort,
"getWeight": p.getWeight,
"getDomain": p.getDomain,
"getProtocol": p.getProtocol,
"getPassHostHeader": p.getPassHostHeader,
"getPriority": p.getPriority,
"getEntryPoints": p.getEntryPoints,
"getFrontendRule": p.getFrontendRule,
"getFrontendBackend": p.getFrontendBackend,
"hasCircuitBreakerLabels": p.hasCircuitBreakerLabels,
"hasLoadBalancerLabels": p.hasLoadBalancerLabels,
"hasMaxConnLabels": p.hasMaxConnLabels,
"getMaxConnExtractorFunc": p.getMaxConnExtractorFunc,
"getMaxConnAmount": p.getMaxConnAmount,
"getLoadBalancerMethod": p.getLoadBalancerMethod,
"getCircuitBreakerExpression": p.getCircuitBreakerExpression,
"getSticky": p.getSticky,
}
applications, err := provider.marathonClient.Applications(nil)
applications, err := p.marathonClient.Applications(nil)
if err != nil {
log.Errorf("Failed to create a client for marathon, error: %s", err)
return nil
}
tasks, err := provider.marathonClient.AllTasks(&marathon.AllTasksOpts{Status: "running"})
tasks, err := p.marathonClient.AllTasks(&marathon.AllTasksOpts{Status: "running"})
if err != nil {
log.Errorf("Failed to create a client for marathon, error: %s", err)
return nil
@ -165,12 +166,12 @@ func (provider *Marathon) loadMarathonConfig() *types.Configuration {
//filter tasks
filteredTasks := fun.Filter(func(task marathon.Task) bool {
return provider.taskFilter(task, applications, provider.ExposedByDefault)
return p.taskFilter(task, applications, p.ExposedByDefault)
}, tasks.Tasks).([]marathon.Task)
//filter apps
filteredApps := fun.Filter(func(app marathon.Application) bool {
return provider.applicationFilter(app, filteredTasks)
return p.applicationFilter(app, filteredTasks)
}, applications.Apps).([]marathon.Application)
templateObjects := struct {
@ -180,17 +181,17 @@ func (provider *Marathon) loadMarathonConfig() *types.Configuration {
}{
filteredApps,
filteredTasks,
provider.Domain,
p.Domain,
}
configuration, err := provider.GetConfiguration("templates/marathon.tmpl", MarathonFuncMap, templateObjects)
configuration, err := p.GetConfiguration("templates/marathon.tmpl", MarathonFuncMap, templateObjects)
if err != nil {
log.Error(err)
}
return configuration
}
func (provider *Marathon) taskFilter(task marathon.Task, applications *marathon.Applications, exposedByDefaultFlag bool) bool {
func (p *Provider) taskFilter(task marathon.Task, applications *marathon.Applications, exposedByDefaultFlag bool) bool {
application, err := getApplication(task, applications.Apps)
if err != nil {
log.Errorf("Unable to get marathon application from task %s", task.AppID)
@ -201,14 +202,14 @@ func (provider *Marathon) taskFilter(task marathon.Task, applications *marathon.
log.Debug("Filtering marathon task without port %s", task.AppID)
return false
}
label, _ := provider.getLabel(application, "traefik.tags")
label, _ := p.getLabel(application, "traefik.tags")
constraintTags := strings.Split(label, ",")
if provider.MarathonLBCompatibility {
if label, err := provider.getLabel(application, "HAPROXY_GROUP"); err == nil {
if p.MarathonLBCompatibility {
if label, err := p.getLabel(application, "HAPROXY_GROUP"); err == nil {
constraintTags = append(constraintTags, label)
}
}
if ok, failingConstraint := provider.MatchConstraints(constraintTags); !ok {
if ok, failingConstraint := p.MatchConstraints(constraintTags); !ok {
if failingConstraint != nil {
log.Debugf("Application %v pruned by '%v' constraint", application.ID, failingConstraint.String())
}
@ -257,15 +258,15 @@ func (provider *Marathon) taskFilter(task marathon.Task, applications *marathon.
return true
}
func (provider *Marathon) applicationFilter(app marathon.Application, filteredTasks []marathon.Task) bool {
label, _ := provider.getLabel(app, "traefik.tags")
func (p *Provider) applicationFilter(app marathon.Application, filteredTasks []marathon.Task) bool {
label, _ := p.getLabel(app, "traefik.tags")
constraintTags := strings.Split(label, ",")
if provider.MarathonLBCompatibility {
if label, err := provider.getLabel(app, "HAPROXY_GROUP"); err == nil {
if p.MarathonLBCompatibility {
if label, err := p.getLabel(app, "HAPROXY_GROUP"); err == nil {
constraintTags = append(constraintTags, label)
}
}
if ok, failingConstraint := provider.MatchConstraints(constraintTags); !ok {
if ok, failingConstraint := p.MatchConstraints(constraintTags); !ok {
if failingConstraint != nil {
log.Debugf("Application %v pruned by '%v' constraint", app.ID, failingConstraint.String())
}
@ -290,7 +291,7 @@ func isApplicationEnabled(application marathon.Application, exposedByDefault boo
return exposedByDefault && (*application.Labels)["traefik.enable"] != "false" || (*application.Labels)["traefik.enable"] == "true"
}
func (provider *Marathon) getLabel(application marathon.Application, label string) (string, error) {
func (p *Provider) getLabel(application marathon.Application, label string) (string, error) {
for key, value := range *application.Labels {
if key == label {
return value, nil
@ -299,19 +300,19 @@ func (provider *Marathon) getLabel(application marathon.Application, label strin
return "", errors.New("Label not found:" + label)
}
func (provider *Marathon) getPort(task marathon.Task, applications []marathon.Application) string {
func (p *Provider) getPort(task marathon.Task, applications []marathon.Application) string {
application, err := getApplication(task, applications)
if err != nil {
log.Errorf("Unable to get marathon application from task %s", task.AppID)
return ""
}
ports := processPorts(application, task)
if portIndexLabel, err := provider.getLabel(application, "traefik.portIndex"); err == nil {
if portIndexLabel, err := p.getLabel(application, "traefik.portIndex"); err == nil {
if index, err := strconv.Atoi(portIndexLabel); err == nil {
return strconv.Itoa(ports[index])
}
}
if portValueLabel, err := provider.getLabel(application, "traefik.port"); err == nil {
if portValueLabel, err := p.getLabel(application, "traefik.port"); err == nil {
return portValueLabel
}
@ -321,60 +322,60 @@ func (provider *Marathon) getPort(task marathon.Task, applications []marathon.Ap
return ""
}
func (provider *Marathon) getWeight(task marathon.Task, applications []marathon.Application) string {
func (p *Provider) getWeight(task marathon.Task, applications []marathon.Application) string {
application, errApp := getApplication(task, applications)
if errApp != nil {
log.Errorf("Unable to get marathon application from task %s", task.AppID)
return "0"
}
if label, err := provider.getLabel(application, "traefik.weight"); err == nil {
if label, err := p.getLabel(application, "traefik.weight"); err == nil {
return label
}
return "0"
}
func (provider *Marathon) getDomain(application marathon.Application) string {
if label, err := provider.getLabel(application, "traefik.domain"); err == nil {
func (p *Provider) getDomain(application marathon.Application) string {
if label, err := p.getLabel(application, "traefik.domain"); err == nil {
return label
}
return provider.Domain
return p.Domain
}
func (provider *Marathon) getProtocol(task marathon.Task, applications []marathon.Application) string {
func (p *Provider) getProtocol(task marathon.Task, applications []marathon.Application) string {
application, errApp := getApplication(task, applications)
if errApp != nil {
log.Errorf("Unable to get marathon application from task %s", task.AppID)
return "http"
}
if label, err := provider.getLabel(application, "traefik.protocol"); err == nil {
if label, err := p.getLabel(application, "traefik.protocol"); err == nil {
return label
}
return "http"
}
func (provider *Marathon) getSticky(application marathon.Application) string {
if sticky, err := provider.getLabel(application, "traefik.backend.loadbalancer.sticky"); err == nil {
func (p *Provider) getSticky(application marathon.Application) string {
if sticky, err := p.getLabel(application, "traefik.backend.loadbalancer.sticky"); err == nil {
return sticky
}
return "false"
}
func (provider *Marathon) getPassHostHeader(application marathon.Application) string {
if passHostHeader, err := provider.getLabel(application, "traefik.frontend.passHostHeader"); err == nil {
func (p *Provider) getPassHostHeader(application marathon.Application) string {
if passHostHeader, err := p.getLabel(application, "traefik.frontend.passHostHeader"); err == nil {
return passHostHeader
}
return "true"
}
func (provider *Marathon) getPriority(application marathon.Application) string {
if priority, err := provider.getLabel(application, "traefik.frontend.priority"); err == nil {
func (p *Provider) getPriority(application marathon.Application) string {
if priority, err := p.getLabel(application, "traefik.frontend.priority"); err == nil {
return priority
}
return "0"
}
func (provider *Marathon) getEntryPoints(application marathon.Application) []string {
if entryPoints, err := provider.getLabel(application, "traefik.frontend.entryPoints"); err == nil {
func (p *Provider) getEntryPoints(application marathon.Application) []string {
if entryPoints, err := p.getLabel(application, "traefik.frontend.entryPoints"); err == nil {
return strings.Split(entryPoints, ",")
}
return []string{}
@ -382,72 +383,72 @@ func (provider *Marathon) getEntryPoints(application marathon.Application) []str
// getFrontendRule returns the frontend rule for the specified application, using
// it's label. It returns a default one (Host) if the label is not present.
func (provider *Marathon) getFrontendRule(application marathon.Application) string {
if label, err := provider.getLabel(application, "traefik.frontend.rule"); err == nil {
func (p *Provider) getFrontendRule(application marathon.Application) string {
if label, err := p.getLabel(application, "traefik.frontend.rule"); err == nil {
return label
}
if provider.MarathonLBCompatibility {
if label, err := provider.getLabel(application, "HAPROXY_0_VHOST"); err == nil {
if p.MarathonLBCompatibility {
if label, err := p.getLabel(application, "HAPROXY_0_VHOST"); err == nil {
return "Host:" + label
}
}
return "Host:" + provider.getSubDomain(application.ID) + "." + provider.Domain
return "Host:" + p.getSubDomain(application.ID) + "." + p.Domain
}
func (provider *Marathon) getBackend(task marathon.Task, applications []marathon.Application) string {
func (p *Provider) getBackend(task marathon.Task, applications []marathon.Application) string {
application, errApp := getApplication(task, applications)
if errApp != nil {
log.Errorf("Unable to get marathon application from task %s", task.AppID)
return ""
}
return provider.getFrontendBackend(application)
return p.getFrontendBackend(application)
}
func (provider *Marathon) getFrontendBackend(application marathon.Application) string {
if label, err := provider.getLabel(application, "traefik.backend"); err == nil {
func (p *Provider) getFrontendBackend(application marathon.Application) string {
if label, err := p.getLabel(application, "traefik.backend"); err == nil {
return label
}
return replace("/", "-", application.ID)
return provider.Replace("/", "-", application.ID)
}
func (provider *Marathon) getSubDomain(name string) string {
if provider.GroupsAsSubDomains {
func (p *Provider) getSubDomain(name string) string {
if p.GroupsAsSubDomains {
splitedName := strings.Split(strings.TrimPrefix(name, "/"), "/")
reverseStringSlice(&splitedName)
provider.ReverseStringSlice(&splitedName)
reverseName := strings.Join(splitedName, ".")
return reverseName
}
return strings.Replace(strings.TrimPrefix(name, "/"), "/", "-", -1)
}
func (provider *Marathon) hasCircuitBreakerLabels(application marathon.Application) bool {
if _, err := provider.getLabel(application, "traefik.backend.circuitbreaker.expression"); err != nil {
func (p *Provider) hasCircuitBreakerLabels(application marathon.Application) bool {
if _, err := p.getLabel(application, "traefik.backend.circuitbreaker.expression"); err != nil {
return false
}
return true
}
func (provider *Marathon) hasLoadBalancerLabels(application marathon.Application) bool {
_, errMethod := provider.getLabel(application, "traefik.backend.loadbalancer.method")
_, errSticky := provider.getLabel(application, "traefik.backend.loadbalancer.sticky")
func (p *Provider) hasLoadBalancerLabels(application marathon.Application) bool {
_, errMethod := p.getLabel(application, "traefik.backend.loadbalancer.method")
_, errSticky := p.getLabel(application, "traefik.backend.loadbalancer.sticky")
if errMethod != nil && errSticky != nil {
return false
}
return true
}
func (provider *Marathon) hasMaxConnLabels(application marathon.Application) bool {
if _, err := provider.getLabel(application, "traefik.backend.maxconn.amount"); err != nil {
func (p *Provider) hasMaxConnLabels(application marathon.Application) bool {
if _, err := p.getLabel(application, "traefik.backend.maxconn.amount"); err != nil {
return false
}
if _, err := provider.getLabel(application, "traefik.backend.maxconn.extractorfunc"); err != nil {
if _, err := p.getLabel(application, "traefik.backend.maxconn.extractorfunc"); err != nil {
return false
}
return true
}
func (provider *Marathon) getMaxConnAmount(application marathon.Application) int64 {
if label, err := provider.getLabel(application, "traefik.backend.maxconn.amount"); err == nil {
func (p *Provider) getMaxConnAmount(application marathon.Application) int64 {
if label, err := p.getLabel(application, "traefik.backend.maxconn.amount"); err == nil {
i, errConv := strconv.ParseInt(label, 10, 64)
if errConv != nil {
log.Errorf("Unable to parse traefik.backend.maxconn.amount %s", label)
@ -458,22 +459,22 @@ func (provider *Marathon) getMaxConnAmount(application marathon.Application) int
return math.MaxInt64
}
func (provider *Marathon) getMaxConnExtractorFunc(application marathon.Application) string {
if label, err := provider.getLabel(application, "traefik.backend.maxconn.extractorfunc"); err == nil {
func (p *Provider) getMaxConnExtractorFunc(application marathon.Application) string {
if label, err := p.getLabel(application, "traefik.backend.maxconn.extractorfunc"); err == nil {
return label
}
return "request.host"
}
func (provider *Marathon) getLoadBalancerMethod(application marathon.Application) string {
if label, err := provider.getLabel(application, "traefik.backend.loadbalancer.method"); err == nil {
func (p *Provider) getLoadBalancerMethod(application marathon.Application) string {
if label, err := p.getLabel(application, "traefik.backend.loadbalancer.method"); err == nil {
return label
}
return "wrr"
}
func (provider *Marathon) getCircuitBreakerExpression(application marathon.Application) string {
if label, err := provider.getLabel(application, "traefik.backend.circuitbreaker.expression"); err == nil {
func (p *Provider) getCircuitBreakerExpression(application marathon.Application) string {
if label, err := p.getLabel(application, "traefik.backend.circuitbreaker.expression"); err == nil {
return label
}
return "NetworkErrorRatio() > 1"
@ -508,7 +509,7 @@ func processPorts(application marathon.Application, task marathon.Task) []int {
return []int{}
}
func (provider *Marathon) getBackendServer(task marathon.Task, applications []marathon.Application) string {
func (p *Provider) getBackendServer(task marathon.Task, applications []marathon.Application) string {
application, err := getApplication(task, applications)
if err != nil {
log.Errorf("Unable to get marathon application from task %s", task.AppID)
@ -519,7 +520,7 @@ func (provider *Marathon) getBackendServer(task marathon.Task, applications []ma
} else if len(task.IPAddresses) == 1 {
return task.IPAddresses[0].IPAddress
} else {
ipAddressIdxStr, err := provider.getLabel(application, "traefik.ipAddressIdx")
ipAddressIdxStr, err := p.getLabel(application, "traefik.ipAddressIdx")
if err != nil {
log.Errorf("Unable to get marathon IPAddress from task %s", task.AppID)
return ""

View file

@ -1,4 +1,4 @@
package provider
package marathon
import (
"errors"
@ -337,7 +337,7 @@ func TestMarathonLoadConfig(t *testing.T) {
for _, c := range cases {
fakeClient := newFakeClient(c.applicationsError, c.applications, c.tasksError, c.tasks)
provider := &Marathon{
provider := &Provider{
Domain: "docker.localhost",
ExposedByDefault: true,
marathonClient: fakeClient,
@ -764,7 +764,7 @@ func TestMarathonTaskFilter(t *testing.T) {
},
}
provider := &Marathon{}
provider := &Provider{}
for _, c := range cases {
actual := provider.taskFilter(c.task, c.applications, c.exposedByDefault)
if actual != c.expected {
@ -827,7 +827,7 @@ func TestMarathonAppConstraints(t *testing.T) {
}
for _, c := range cases {
provider := &Marathon{
provider := &Provider{
MarathonLBCompatibility: c.marathonLBCompatibility,
}
constraint, _ := types.NewConstraint("tag==valid")
@ -907,7 +907,7 @@ func TestMarathonTaskConstraints(t *testing.T) {
}
for _, c := range cases {
provider := &Marathon{
provider := &Provider{
MarathonLBCompatibility: c.marathonLBCompatibility,
}
constraint, _ := types.NewConstraint("tag==valid")
@ -968,7 +968,7 @@ func TestMarathonApplicationFilter(t *testing.T) {
},
}
provider := &Marathon{}
provider := &Provider{}
for _, c := range cases {
actual := provider.applicationFilter(c.application, c.filteredTasks)
if actual != c.expected {
@ -978,7 +978,7 @@ func TestMarathonApplicationFilter(t *testing.T) {
}
func TestMarathonGetPort(t *testing.T) {
provider := &Marathon{}
provider := &Provider{}
cases := []struct {
applications []marathon.Application
@ -1082,7 +1082,7 @@ func TestMarathonGetPort(t *testing.T) {
}
func TestMarathonGetWeigh(t *testing.T) {
provider := &Marathon{}
provider := &Provider{}
applications := []struct {
applications []marathon.Application
@ -1147,7 +1147,7 @@ func TestMarathonGetWeigh(t *testing.T) {
}
func TestMarathonGetDomain(t *testing.T) {
provider := &Marathon{
provider := &Provider{
Domain: "docker.localhost",
}
@ -1179,7 +1179,7 @@ func TestMarathonGetDomain(t *testing.T) {
}
func TestMarathonGetProtocol(t *testing.T) {
provider := &Marathon{}
provider := &Provider{}
applications := []struct {
applications []marathon.Application
@ -1244,7 +1244,7 @@ func TestMarathonGetProtocol(t *testing.T) {
}
func TestMarathonGetPassHostHeader(t *testing.T) {
provider := &Marathon{}
provider := &Provider{}
applications := []struct {
application marathon.Application
@ -1274,7 +1274,7 @@ func TestMarathonGetPassHostHeader(t *testing.T) {
}
func TestMarathonGetEntryPoints(t *testing.T) {
provider := &Marathon{}
provider := &Provider{}
applications := []struct {
application marathon.Application
@ -1348,7 +1348,7 @@ func TestMarathonGetFrontendRule(t *testing.T) {
}
for _, a := range applications {
provider := &Marathon{
provider := &Provider{
Domain: "docker.localhost",
MarathonLBCompatibility: a.marathonLBCompatibility,
}
@ -1360,7 +1360,7 @@ func TestMarathonGetFrontendRule(t *testing.T) {
}
func TestMarathonGetBackend(t *testing.T) {
provider := &Marathon{}
provider := &Provider{}
applications := []struct {
application marathon.Application
@ -1386,13 +1386,13 @@ func TestMarathonGetBackend(t *testing.T) {
}
func TestMarathonGetSubDomain(t *testing.T) {
providerGroups := &Marathon{GroupsAsSubDomains: true}
providerNoGroups := &Marathon{GroupsAsSubDomains: false}
providerGroups := &Provider{GroupsAsSubDomains: true}
providerNoGroups := &Provider{GroupsAsSubDomains: false}
apps := []struct {
path string
expected string
provider *Marathon
provider *Provider
}{
{"/test", "test", providerNoGroups},
{"/test", "test", providerGroups},

View file

@ -1,4 +1,4 @@
package provider
package mesos
import (
"errors"
@ -12,6 +12,7 @@ import (
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/mesos/mesos-go/detector"
@ -24,11 +25,11 @@ import (
"github.com/mesosphere/mesos-dns/util"
)
var _ Provider = (*Mesos)(nil)
var _ provider.Provider = (*Provider)(nil)
//Mesos holds configuration of the mesos provider.
type Mesos struct {
BaseProvider
//Provider holds configuration of the provider.
type Provider struct {
provider.BaseProvider
Endpoint string `description:"Mesos server endpoint. You can also specify multiple endpoint for Mesos"`
Domain string `description:"Default domain used"`
ExposedByDefault bool `description:"Expose Mesos apps by default"`
@ -40,30 +41,30 @@ type Mesos struct {
Masters []string
}
// Provide allows the provider to provide configurations to traefik
// Provide allows the mesos provider to provide configurations to traefik
// using the given configuration channel.
func (provider *Mesos) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
operation := func() error {
// initialize logging
logging.SetupLogs()
log.Debugf("%s", provider.IPSources)
log.Debugf("%s", p.IPSources)
var zk string
var masters []string
if strings.HasPrefix(provider.Endpoint, "zk://") {
zk = provider.Endpoint
if strings.HasPrefix(p.Endpoint, "zk://") {
zk = p.Endpoint
} else {
masters = strings.Split(provider.Endpoint, ",")
masters = strings.Split(p.Endpoint, ",")
}
errch := make(chan error)
changed := detectMasters(zk, masters)
reload := time.NewTicker(time.Second * time.Duration(provider.RefreshSeconds))
zkTimeout := time.Second * time.Duration(provider.ZkDetectionTimeout)
reload := time.NewTicker(time.Second * time.Duration(p.RefreshSeconds))
zkTimeout := time.Second * time.Duration(p.ZkDetectionTimeout)
timeout := time.AfterFunc(zkTimeout, func() {
if zkTimeout > 0 {
errch <- fmt.Errorf("master detection timed out after %s", zkTimeout)
@ -73,7 +74,7 @@ func (provider *Mesos) Provide(configurationChan chan<- types.ConfigMessage, poo
defer reload.Stop()
defer util.HandleCrash()
if !provider.Watch {
if !p.Watch {
reload.Stop()
timeout.Stop()
}
@ -81,7 +82,7 @@ func (provider *Mesos) Provide(configurationChan chan<- types.ConfigMessage, poo
for {
select {
case <-reload.C:
configuration := provider.loadMesosConfig()
configuration := p.loadMesosConfig()
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: "mesos",
@ -96,8 +97,8 @@ func (provider *Mesos) Provide(configurationChan chan<- types.ConfigMessage, poo
timeout.Stop()
}
log.Debugf("new masters detected: %v", masters)
provider.Masters = masters
configuration := provider.loadMesosConfig()
p.Masters = masters
configuration := p.loadMesosConfig()
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: "mesos",
@ -120,34 +121,34 @@ func (provider *Mesos) Provide(configurationChan chan<- types.ConfigMessage, poo
return nil
}
func (provider *Mesos) loadMesosConfig() *types.Configuration {
func (p *Provider) loadMesosConfig() *types.Configuration {
var mesosFuncMap = template.FuncMap{
"getBackend": provider.getBackend,
"getPort": provider.getPort,
"getHost": provider.getHost,
"getWeight": provider.getWeight,
"getDomain": provider.getDomain,
"getProtocol": provider.getProtocol,
"getPassHostHeader": provider.getPassHostHeader,
"getPriority": provider.getPriority,
"getEntryPoints": provider.getEntryPoints,
"getFrontendRule": provider.getFrontendRule,
"getFrontendBackend": provider.getFrontendBackend,
"getID": provider.getID,
"getFrontEndName": provider.getFrontEndName,
"getBackend": p.getBackend,
"getPort": p.getPort,
"getHost": p.getHost,
"getWeight": p.getWeight,
"getDomain": p.getDomain,
"getProtocol": p.getProtocol,
"getPassHostHeader": p.getPassHostHeader,
"getPriority": p.getPriority,
"getEntryPoints": p.getEntryPoints,
"getFrontendRule": p.getFrontendRule,
"getFrontendBackend": p.getFrontendBackend,
"getID": p.getID,
"getFrontEndName": p.getFrontEndName,
}
t := records.NewRecordGenerator(time.Duration(provider.StateTimeoutSecond) * time.Second)
sj, err := t.FindMaster(provider.Masters...)
t := records.NewRecordGenerator(time.Duration(p.StateTimeoutSecond) * time.Second)
sj, err := t.FindMaster(p.Masters...)
if err != nil {
log.Errorf("Failed to create a client for mesos, error: %s", err)
return nil
}
tasks := provider.taskRecords(sj)
tasks := p.taskRecords(sj)
//filter tasks
filteredTasks := fun.Filter(func(task state.Task) bool {
return mesosTaskFilter(task, provider.ExposedByDefault)
return mesosTaskFilter(task, p.ExposedByDefault)
}, tasks).([]state.Task)
filteredApps := []state.Task{}
@ -164,10 +165,10 @@ func (provider *Mesos) loadMesosConfig() *types.Configuration {
}{
filteredApps,
filteredTasks,
provider.Domain,
p.Domain,
}
configuration, err := provider.GetConfiguration("templates/mesos.tmpl", mesosFuncMap, templateObjects)
configuration, err := p.GetConfiguration("templates/mesos.tmpl", mesosFuncMap, templateObjects)
if err != nil {
log.Error(err)
}
@ -261,7 +262,7 @@ func isMesosApplicationEnabled(task state.Task, exposedByDefault bool) bool {
return exposedByDefault && labels(task, "traefik.enable") != "false" || labels(task, "traefik.enable") == "true"
}
func (provider *Mesos) getLabel(task state.Task, label string) (string, error) {
func (p *Provider) getLabel(task state.Task, label string) (string, error) {
for _, tmpLabel := range task.Labels {
if tmpLabel.Key == label {
return tmpLabel.Value, nil
@ -270,19 +271,19 @@ func (provider *Mesos) getLabel(task state.Task, label string) (string, error) {
return "", errors.New("Label not found:" + label)
}
func (provider *Mesos) getPort(task state.Task, applications []state.Task) string {
func (p *Provider) getPort(task state.Task, applications []state.Task) string {
application, err := getMesos(task, applications)
if err != nil {
log.Errorf("Unable to get mesos application from task %s", task.DiscoveryInfo.Name)
return ""
}
if portIndexLabel, err := provider.getLabel(application, "traefik.portIndex"); err == nil {
if portIndexLabel, err := p.getLabel(application, "traefik.portIndex"); err == nil {
if index, err := strconv.Atoi(portIndexLabel); err == nil {
return strconv.Itoa(task.DiscoveryInfo.Ports.DiscoveryPorts[index].Number)
}
}
if portValueLabel, err := provider.getLabel(application, "traefik.port"); err == nil {
if portValueLabel, err := p.getLabel(application, "traefik.port"); err == nil {
return portValueLabel
}
@ -292,54 +293,54 @@ func (provider *Mesos) getPort(task state.Task, applications []state.Task) strin
return ""
}
func (provider *Mesos) getWeight(task state.Task, applications []state.Task) string {
func (p *Provider) getWeight(task state.Task, applications []state.Task) string {
application, errApp := getMesos(task, applications)
if errApp != nil {
log.Errorf("Unable to get mesos application from task %s", task.DiscoveryInfo.Name)
return "0"
}
if label, err := provider.getLabel(application, "traefik.weight"); err == nil {
if label, err := p.getLabel(application, "traefik.weight"); err == nil {
return label
}
return "0"
}
func (provider *Mesos) getDomain(task state.Task) string {
if label, err := provider.getLabel(task, "traefik.domain"); err == nil {
func (p *Provider) getDomain(task state.Task) string {
if label, err := p.getLabel(task, "traefik.domain"); err == nil {
return label
}
return provider.Domain
return p.Domain
}
func (provider *Mesos) getProtocol(task state.Task, applications []state.Task) string {
func (p *Provider) getProtocol(task state.Task, applications []state.Task) string {
application, errApp := getMesos(task, applications)
if errApp != nil {
log.Errorf("Unable to get mesos application from task %s", task.DiscoveryInfo.Name)
return "http"
}
if label, err := provider.getLabel(application, "traefik.protocol"); err == nil {
if label, err := p.getLabel(application, "traefik.protocol"); err == nil {
return label
}
return "http"
}
func (provider *Mesos) getPassHostHeader(task state.Task) string {
if passHostHeader, err := provider.getLabel(task, "traefik.frontend.passHostHeader"); err == nil {
func (p *Provider) getPassHostHeader(task state.Task) string {
if passHostHeader, err := p.getLabel(task, "traefik.frontend.passHostHeader"); err == nil {
return passHostHeader
}
return "false"
}
func (provider *Mesos) getPriority(task state.Task) string {
if priority, err := provider.getLabel(task, "traefik.frontend.priority"); err == nil {
func (p *Provider) getPriority(task state.Task) string {
if priority, err := p.getLabel(task, "traefik.frontend.priority"); err == nil {
return priority
}
return "0"
}
func (provider *Mesos) getEntryPoints(task state.Task) []string {
if entryPoints, err := provider.getLabel(task, "traefik.frontend.entryPoints"); err == nil {
func (p *Provider) getEntryPoints(task state.Task) []string {
if entryPoints, err := p.getLabel(task, "traefik.frontend.entryPoints"); err == nil {
return strings.Split(entryPoints, ",")
}
return []string{}
@ -347,38 +348,38 @@ func (provider *Mesos) getEntryPoints(task state.Task) []string {
// getFrontendRule returns the frontend rule for the specified application, using
// it's label. It returns a default one (Host) if the label is not present.
func (provider *Mesos) getFrontendRule(task state.Task) string {
if label, err := provider.getLabel(task, "traefik.frontend.rule"); err == nil {
func (p *Provider) getFrontendRule(task state.Task) string {
if label, err := p.getLabel(task, "traefik.frontend.rule"); err == nil {
return label
}
return "Host:" + strings.ToLower(strings.Replace(provider.getSubDomain(task.DiscoveryInfo.Name), "_", "-", -1)) + "." + provider.Domain
return "Host:" + strings.ToLower(strings.Replace(p.getSubDomain(task.DiscoveryInfo.Name), "_", "-", -1)) + "." + p.Domain
}
func (provider *Mesos) getBackend(task state.Task, applications []state.Task) string {
func (p *Provider) getBackend(task state.Task, applications []state.Task) string {
application, errApp := getMesos(task, applications)
if errApp != nil {
log.Errorf("Unable to get mesos application from task %s", task.DiscoveryInfo.Name)
return ""
}
return provider.getFrontendBackend(application)
return p.getFrontendBackend(application)
}
func (provider *Mesos) getFrontendBackend(task state.Task) string {
if label, err := provider.getLabel(task, "traefik.backend"); err == nil {
func (p *Provider) getFrontendBackend(task state.Task) string {
if label, err := p.getLabel(task, "traefik.backend"); err == nil {
return label
}
return "-" + cleanupSpecialChars(task.DiscoveryInfo.Name)
}
func (provider *Mesos) getHost(task state.Task) string {
return task.IP(strings.Split(provider.IPSources, ",")...)
func (p *Provider) getHost(task state.Task) string {
return task.IP(strings.Split(p.IPSources, ",")...)
}
func (provider *Mesos) getID(task state.Task) string {
func (p *Provider) getID(task state.Task) string {
return cleanupSpecialChars(task.ID)
}
func (provider *Mesos) getFrontEndName(task state.Task) string {
func (p *Provider) getFrontEndName(task state.Task) string {
return strings.Replace(cleanupSpecialChars(task.ID), "/", "-", -1)
}
@ -401,8 +402,8 @@ func detectMasters(zk string, masters []string) <-chan []string {
return changed
}
func (provider *Mesos) taskRecords(sj state.State) []state.Task {
var p []state.Task // == nil
func (p *Provider) taskRecords(sj state.State) []state.Task {
var tasks []state.Task // == nil
for _, f := range sj.Frameworks {
for _, task := range f.Tasks {
for _, slave := range sj.Slaves {
@ -413,12 +414,12 @@ func (provider *Mesos) taskRecords(sj state.State) []state.Task {
// only do running and discoverable tasks
if task.State == "TASK_RUNNING" {
p = append(p, task)
tasks = append(tasks, task)
}
}
}
return p
return tasks
}
// ErrorFunction A function definition that returns an error
@ -431,10 +432,10 @@ type ErrorFunction func() error
func Ignore(f ErrorFunction) {
_ = f()
}
func (provider *Mesos) getSubDomain(name string) string {
if provider.GroupsAsSubDomains {
func (p *Provider) getSubDomain(name string) string {
if p.GroupsAsSubDomains {
splitedName := strings.Split(strings.TrimPrefix(name, "/"), "/")
reverseStringSlice(&splitedName)
provider.ReverseStringSlice(&splitedName)
reverseName := strings.Join(splitedName, ".")
return reverseName
}

View file

@ -1,4 +1,4 @@
package provider
package mesos
import (
"reflect"
@ -199,7 +199,7 @@ func TestTaskRecords(t *testing.T) {
Frameworks: []state.Framework{framework},
}
provider := &Mesos{
provider := &Provider{
Domain: "docker.localhost",
ExposedByDefault: true,
}
@ -224,7 +224,7 @@ func TestMesosLoadConfig(t *testing.T) {
expectedBackends map[string]*types.Backend
}{}
for _, c := range cases {
provider := &Mesos{
provider := &Provider{
Domain: "docker.localhost",
ExposedByDefault: true,
}
@ -246,13 +246,13 @@ func TestMesosLoadConfig(t *testing.T) {
}
func TestMesosGetSubDomain(t *testing.T) {
providerGroups := &Mesos{GroupsAsSubDomains: true}
providerNoGroups := &Mesos{GroupsAsSubDomains: false}
providerGroups := &Provider{GroupsAsSubDomains: true}
providerNoGroups := &Provider{GroupsAsSubDomains: false}
apps := []struct {
path string
expected string
provider *Mesos
provider *Provider
}{
{"/test", "test", providerNoGroups},
{"/test", "test", providerGroups},

View file

@ -59,7 +59,7 @@ func (p *BaseProvider) GetConfiguration(defaultTemplateFile string, funcMap temp
)
configuration := new(types.Configuration)
var defaultFuncMap = template.FuncMap{
"replace": replace,
"replace": Replace,
"tolower": strings.ToLower,
"normalize": Normalize,
"split": split,
@ -101,7 +101,8 @@ func (p *BaseProvider) GetConfiguration(defaultTemplateFile string, funcMap temp
return configuration, nil
}
func replace(s1 string, s2 string, s3 string) string {
// Replace is an alias for strings.Replace
func Replace(s1 string, s2 string, s3 string) string {
return strings.Replace(s3, s1, s2, -1)
}
@ -122,7 +123,8 @@ func Normalize(name string) string {
return strings.Join(strings.FieldsFunc(name, fargs), "-")
}
func reverseStringSlice(slice *[]string) {
// ReverseStringSlice invert the order of the given slice of string
func ReverseStringSlice(slice *[]string) {
for i, j := 0, len(*slice)-1; i < j; i, j = i+1, j-1 {
(*slice)[i], (*slice)[j] = (*slice)[j], (*slice)[i]
}

View file

@ -169,7 +169,7 @@ func TestReplace(t *testing.T) {
}
for _, c := range cases {
actual := replace("foo", "bar", c.str)
actual := Replace("foo", "bar", c.str)
if actual != c.expected {
t.Fatalf("expected %q, got %q, for %q", c.expected, actual, c.str)
}

View file

@ -1,4 +1,4 @@
package provider
package rancher
import (
"context"
@ -15,6 +15,7 @@ import (
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
rancher "github.com/rancher/go-rancher/client"
@ -25,16 +26,16 @@ const (
RancherDefaultWatchTime = 15 * time.Second
)
var _ Provider = (*Rancher)(nil)
var _ provider.Provider = (*Provider)(nil)
// Rancher holds configurations of the Rancher provider.
type Rancher struct {
BaseProvider `mapstructure:",squash"`
Endpoint string `description:"Rancher server HTTP(S) endpoint."`
AccessKey string `description:"Rancher server access key."`
SecretKey string `description:"Rancher server Secret Key."`
ExposedByDefault bool `description:"Expose Services by default"`
Domain string `description:"Default domain used"`
// Provider holds configurations of the provider.
type Provider struct {
provider.BaseProvider `mapstructure:",squash"`
Endpoint string `description:"Rancher server HTTP(S) endpoint."`
AccessKey string `description:"Rancher server access key."`
SecretKey string `description:"Rancher server Secret Key."`
ExposedByDefault bool `description:"Expose Services by default"`
Domain string `description:"Default domain used"`
}
type rancherData struct {
@ -49,48 +50,48 @@ func (r rancherData) String() string {
}
// Frontend Labels
func (provider *Rancher) getPassHostHeader(service rancherData) string {
func (p *Provider) getPassHostHeader(service rancherData) string {
if passHostHeader, err := getServiceLabel(service, "traefik.frontend.passHostHeader"); err == nil {
return passHostHeader
}
return "true"
}
func (provider *Rancher) getPriority(service rancherData) string {
func (p *Provider) getPriority(service rancherData) string {
if priority, err := getServiceLabel(service, "traefik.frontend.priority"); err == nil {
return priority
}
return "0"
}
func (provider *Rancher) getEntryPoints(service rancherData) []string {
func (p *Provider) getEntryPoints(service rancherData) []string {
if entryPoints, err := getServiceLabel(service, "traefik.frontend.entryPoints"); err == nil {
return strings.Split(entryPoints, ",")
}
return []string{}
}
func (provider *Rancher) getFrontendRule(service rancherData) string {
func (p *Provider) getFrontendRule(service rancherData) string {
if label, err := getServiceLabel(service, "traefik.frontend.rule"); err == nil {
return label
}
return "Host:" + strings.ToLower(strings.Replace(service.Name, "/", ".", -1)) + "." + provider.Domain
return "Host:" + strings.ToLower(strings.Replace(service.Name, "/", ".", -1)) + "." + p.Domain
}
func (provider *Rancher) getFrontendName(service rancherData) string {
func (p *Provider) getFrontendName(service rancherData) string {
// Replace '.' with '-' in quoted keys because of this issue https://github.com/BurntSushi/toml/issues/78
return Normalize(provider.getFrontendRule(service))
return provider.Normalize(p.getFrontendRule(service))
}
// Backend Labels
func (provider *Rancher) getLoadBalancerMethod(service rancherData) string {
func (p *Provider) getLoadBalancerMethod(service rancherData) string {
if label, err := getServiceLabel(service, "traefik.backend.loadbalancer.method"); err == nil {
return label
}
return "wrr"
}
func (provider *Rancher) hasLoadBalancerLabel(service rancherData) bool {
func (p *Provider) hasLoadBalancerLabel(service rancherData) bool {
_, errMethod := getServiceLabel(service, "traefik.backend.loadbalancer.method")
_, errSticky := getServiceLabel(service, "traefik.backend.loadbalancer.sticky")
if errMethod != nil && errSticky != nil {
@ -99,64 +100,64 @@ func (provider *Rancher) hasLoadBalancerLabel(service rancherData) bool {
return true
}
func (provider *Rancher) hasCircuitBreakerLabel(service rancherData) bool {
func (p *Provider) hasCircuitBreakerLabel(service rancherData) bool {
if _, err := getServiceLabel(service, "traefik.backend.circuitbreaker.expression"); err != nil {
return false
}
return true
}
func (provider *Rancher) getCircuitBreakerExpression(service rancherData) string {
func (p *Provider) getCircuitBreakerExpression(service rancherData) string {
if label, err := getServiceLabel(service, "traefik.backend.circuitbreaker.expression"); err == nil {
return label
}
return "NetworkErrorRatio() > 1"
}
func (provider *Rancher) getSticky(service rancherData) string {
func (p *Provider) getSticky(service rancherData) string {
if _, err := getServiceLabel(service, "traefik.backend.loadbalancer.sticky"); err == nil {
return "true"
}
return "false"
}
func (provider *Rancher) getBackend(service rancherData) string {
func (p *Provider) getBackend(service rancherData) string {
if label, err := getServiceLabel(service, "traefik.backend"); err == nil {
return Normalize(label)
return provider.Normalize(label)
}
return Normalize(service.Name)
return provider.Normalize(service.Name)
}
// Generall Application Stuff
func (provider *Rancher) getPort(service rancherData) string {
func (p *Provider) getPort(service rancherData) string {
if label, err := getServiceLabel(service, "traefik.port"); err == nil {
return label
}
return ""
}
func (provider *Rancher) getProtocol(service rancherData) string {
func (p *Provider) getProtocol(service rancherData) string {
if label, err := getServiceLabel(service, "traefik.protocol"); err == nil {
return label
}
return "http"
}
func (provider *Rancher) getWeight(service rancherData) string {
func (p *Provider) getWeight(service rancherData) string {
if label, err := getServiceLabel(service, "traefik.weight"); err == nil {
return label
}
return "0"
}
func (provider *Rancher) getDomain(service rancherData) string {
func (p *Provider) getDomain(service rancherData) string {
if label, err := getServiceLabel(service, "traefik.domain"); err == nil {
return label
}
return provider.Domain
return p.Domain
}
func (provider *Rancher) hasMaxConnLabels(service rancherData) bool {
func (p *Provider) hasMaxConnLabels(service rancherData) bool {
if _, err := getServiceLabel(service, "traefik.backend.maxconn.amount"); err != nil {
return false
}
@ -166,7 +167,7 @@ func (provider *Rancher) hasMaxConnLabels(service rancherData) bool {
return true
}
func (provider *Rancher) getMaxConnAmount(service rancherData) int64 {
func (p *Provider) getMaxConnAmount(service rancherData) int64 {
if label, err := getServiceLabel(service, "traefik.backend.maxconn.amount"); err == nil {
i, errConv := strconv.ParseInt(label, 10, 64)
if errConv != nil {
@ -178,7 +179,7 @@ func (provider *Rancher) getMaxConnAmount(service rancherData) int64 {
return math.MaxInt64
}
func (provider *Rancher) getMaxConnExtractorFunc(service rancherData) string {
func (p *Provider) getMaxConnExtractorFunc(service rancherData) string {
if label, err := getServiceLabel(service, "traefik.backend.maxconn.extractorfunc"); err == nil {
return label
}
@ -194,11 +195,11 @@ func getServiceLabel(service rancherData, label string) (string, error) {
return "", errors.New("Label not found:" + label)
}
func (provider *Rancher) createClient() (*rancher.RancherClient, error) {
func (p *Provider) createClient() (*rancher.RancherClient, error) {
rancherURL := getenv("CATTLE_URL", provider.Endpoint)
accessKey := getenv("CATTLE_ACCESS_KEY", provider.AccessKey)
secretKey := getenv("CATTLE_SECRET_KEY", provider.SecretKey)
rancherURL := getenv("CATTLE_URL", p.Endpoint)
accessKey := getenv("CATTLE_ACCESS_KEY", p.AccessKey)
secretKey := getenv("CATTLE_SECRET_KEY", p.SecretKey)
return rancher.NewRancherClient(&rancher.ClientOpts{
Url: rancherURL,
@ -215,13 +216,13 @@ func getenv(key, fallback string) string {
return value
}
// Provide allows the provider to provide configurations to traefik
// Provide allows the rancher provider to provide configurations to traefik
// using the given configuration channel.
func (provider *Rancher) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
safe.Go(func() {
operation := func() error {
rancherClient, err := provider.createClient()
rancherClient, err := p.createClient()
if err != nil {
log.Errorf("Failed to create a client for rancher, error: %s", err)
@ -235,13 +236,13 @@ func (provider *Rancher) Provide(configurationChan chan<- types.ConfigMessage, p
var rancherData = parseRancherData(environments, services, container)
configuration := provider.loadRancherConfig(rancherData)
configuration := p.loadRancherConfig(rancherData)
configurationChan <- types.ConfigMessage{
ProviderName: "rancher",
Configuration: configuration,
}
if provider.Watch {
if p.Watch {
_, cancel := context.WithCancel(ctx)
ticker := time.NewTicker(RancherDefaultWatchTime)
pool.Go(func(stop chan bool) {
@ -249,14 +250,14 @@ func (provider *Rancher) Provide(configurationChan chan<- types.ConfigMessage, p
select {
case <-ticker.C:
log.Debugf("Refreshing new Data from Rancher API")
log.Debugf("Refreshing new Data from Provider API")
var environments = listRancherEnvironments(rancherClient)
var services = listRancherServices(rancherClient)
var container = listRancherContainer(rancherClient)
rancherData := parseRancherData(environments, services, container)
configuration := provider.loadRancherConfig(rancherData)
configuration := p.loadRancherConfig(rancherData)
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: "rancher",
@ -275,11 +276,11 @@ func (provider *Rancher) Provide(configurationChan chan<- types.ConfigMessage, p
return nil
}
notify := func(err error, time time.Duration) {
log.Errorf("Rancher connection error %+v, retrying in %s", err, time)
log.Errorf("Provider connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Cannot connect to Rancher Endpoint %+v", err)
log.Errorf("Cannot connect to Provider Endpoint %+v", err)
}
})
@ -293,7 +294,7 @@ func listRancherEnvironments(client *rancher.RancherClient) []*rancher.Environme
environments, err := client.Environment.List(nil)
if err != nil {
log.Errorf("Cannot get Rancher Environments %+v", err)
log.Errorf("Cannot get Provider Environments %+v", err)
}
for k := range environments.Data {
@ -310,7 +311,7 @@ func listRancherServices(client *rancher.RancherClient) []*rancher.Service {
services, err := client.Service.List(nil)
if err != nil {
log.Errorf("Cannot get Rancher Services %+v", err)
log.Errorf("Cannot get Provider Services %+v", err)
}
for k := range services.Data {
@ -329,7 +330,7 @@ func listRancherContainer(client *rancher.RancherClient) []*rancher.Container {
log.Debugf("first container len: %i", len(container.Data))
if err != nil {
log.Errorf("Cannot get Rancher Services %+v", err)
log.Errorf("Cannot get Provider Services %+v", err)
}
valid := true
@ -389,40 +390,40 @@ func parseRancherData(environments []*rancher.Environment, services []*rancher.S
return rancherDataList
}
func (provider *Rancher) loadRancherConfig(services []rancherData) *types.Configuration {
func (p *Provider) loadRancherConfig(services []rancherData) *types.Configuration {
var RancherFuncMap = template.FuncMap{
"getPort": provider.getPort,
"getBackend": provider.getBackend,
"getWeight": provider.getWeight,
"getDomain": provider.getDomain,
"getProtocol": provider.getProtocol,
"getPassHostHeader": provider.getPassHostHeader,
"getPriority": provider.getPriority,
"getEntryPoints": provider.getEntryPoints,
"getFrontendRule": provider.getFrontendRule,
"hasCircuitBreakerLabel": provider.hasCircuitBreakerLabel,
"getCircuitBreakerExpression": provider.getCircuitBreakerExpression,
"hasLoadBalancerLabel": provider.hasLoadBalancerLabel,
"getLoadBalancerMethod": provider.getLoadBalancerMethod,
"hasMaxConnLabels": provider.hasMaxConnLabels,
"getMaxConnAmount": provider.getMaxConnAmount,
"getMaxConnExtractorFunc": provider.getMaxConnExtractorFunc,
"getSticky": provider.getSticky,
"getPort": p.getPort,
"getBackend": p.getBackend,
"getWeight": p.getWeight,
"getDomain": p.getDomain,
"getProtocol": p.getProtocol,
"getPassHostHeader": p.getPassHostHeader,
"getPriority": p.getPriority,
"getEntryPoints": p.getEntryPoints,
"getFrontendRule": p.getFrontendRule,
"hasCircuitBreakerLabel": p.hasCircuitBreakerLabel,
"getCircuitBreakerExpression": p.getCircuitBreakerExpression,
"hasLoadBalancerLabel": p.hasLoadBalancerLabel,
"getLoadBalancerMethod": p.getLoadBalancerMethod,
"hasMaxConnLabels": p.hasMaxConnLabels,
"getMaxConnAmount": p.getMaxConnAmount,
"getMaxConnExtractorFunc": p.getMaxConnExtractorFunc,
"getSticky": p.getSticky,
}
// filter services
filteredServices := fun.Filter(func(service rancherData) bool {
return provider.serviceFilter(service)
return p.serviceFilter(service)
}, services).([]rancherData)
frontends := map[string]rancherData{}
backends := map[string]rancherData{}
for _, service := range filteredServices {
frontendName := provider.getFrontendName(service)
frontendName := p.getFrontendName(service)
frontends[frontendName] = service
backendName := provider.getBackend(service)
backendName := p.getBackend(service)
backends[backendName] = service
}
@ -433,10 +434,10 @@ func (provider *Rancher) loadRancherConfig(services []rancherData) *types.Config
}{
frontends,
backends,
provider.Domain,
p.Domain,
}
configuration, err := provider.GetConfiguration("templates/rancher.tmpl", RancherFuncMap, templateObjects)
configuration, err := p.GetConfiguration("templates/rancher.tmpl", RancherFuncMap, templateObjects)
if err != nil {
log.Error(err)
}
@ -444,14 +445,14 @@ func (provider *Rancher) loadRancherConfig(services []rancherData) *types.Config
}
func (provider *Rancher) serviceFilter(service rancherData) bool {
func (p *Provider) serviceFilter(service rancherData) bool {
if service.Labels["traefik.port"] == "" {
log.Debugf("Filtering service %s without traefik.port label", service.Name)
return false
}
if !isServiceEnabled(service, provider.ExposedByDefault) {
if !isServiceEnabled(service, p.ExposedByDefault) {
log.Debugf("Filtering disabled service %s", service.Name)
return false
}

View file

@ -1,4 +1,4 @@
package provider
package rancher
import (
"github.com/containous/traefik/types"
@ -8,7 +8,7 @@ import (
)
func TestRancherGetFrontendName(t *testing.T) {
provider := &Rancher{
provider := &Provider{
Domain: "rancher.localhost",
}
@ -73,7 +73,7 @@ func TestRancherGetFrontendName(t *testing.T) {
}
func TestRancherGetFrontendRule(t *testing.T) {
provider := &Rancher{
provider := &Provider{
Domain: "rancher.localhost",
}
@ -134,7 +134,7 @@ func TestRancherGetFrontendRule(t *testing.T) {
}
func TestRancherGetBackend(t *testing.T) {
provider := &Rancher{
provider := &Provider{
Domain: "rancher.localhost",
}
@ -169,7 +169,7 @@ func TestRancherGetBackend(t *testing.T) {
}
func TestRancherGetWeight(t *testing.T) {
provider := &Rancher{
provider := &Provider{
Domain: "rancher.localhost",
}
@ -204,7 +204,7 @@ func TestRancherGetWeight(t *testing.T) {
}
func TestRancherGetPort(t *testing.T) {
provider := &Rancher{
provider := &Provider{
Domain: "rancher.localhost",
}
@ -239,7 +239,7 @@ func TestRancherGetPort(t *testing.T) {
}
func TestRancherGetDomain(t *testing.T) {
provider := &Rancher{
provider := &Provider{
Domain: "rancher.localhost",
}
@ -274,7 +274,7 @@ func TestRancherGetDomain(t *testing.T) {
}
func TestRancherGetProtocol(t *testing.T) {
provider := &Rancher{
provider := &Provider{
Domain: "rancher.localhost",
}
@ -309,7 +309,7 @@ func TestRancherGetProtocol(t *testing.T) {
}
func TestRancherGetPassHostHeader(t *testing.T) {
provider := &Rancher{
provider := &Provider{
Domain: "rancher.localhost",
}
@ -430,7 +430,7 @@ func TestRancherLoadRancherConfig(t *testing.T) {
},
}
provider := &Rancher{
provider := &Provider{
Domain: "rancher.localhost",
ExposedByDefault: true,
}

View file

@ -1,35 +0,0 @@
package provider
import (
"fmt"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/zookeeper"
)
var _ Provider = (*Zookepper)(nil)
// Zookepper holds configurations of the Zookepper provider.
type Zookepper struct {
Kv `mapstructure:",squash"`
}
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *Zookepper) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
store, err := provider.CreateStore()
if err != nil {
return fmt.Errorf("Failed to Connect to KV store: %v", err)
}
provider.kvclient = store
return provider.provide(configurationChan, pool, constraints)
}
// CreateStore creates the KV store
func (provider *Zookepper) CreateStore() (store.Store, error) {
provider.storeType = store.ZK
zookeeper.Register()
return provider.createStore()
}

37
provider/zk/zk.go Normal file
View file

@ -0,0 +1,37 @@
package zk
import (
"fmt"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/provider/kv"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/zookeeper"
)
var _ provider.Provider = (*Provider)(nil)
// Provider holds configurations of the provider.
type Provider struct {
kv.Provider `mapstructure:",squash"`
}
// Provide allows the zk provider to Provide configurations to traefik
// using the given configuration channel.
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
store, err := p.CreateStore()
if err != nil {
return fmt.Errorf("Failed to Connect to KV store: %v", err)
}
p.Kvclient = store
return p.Provider.Provide(configurationChan, pool, constraints)
}
// CreateStore creates the KV store
func (p *Provider) CreateStore() (store.Store, error) {
p.StoreType = store.ZK
zookeeper.Register()
return p.Provider.CreateStore()
}

View file

@ -20,7 +20,7 @@ import (
"github.com/containous/traefik/cmd"
"github.com/containous/traefik/log"
"github.com/containous/traefik/middlewares"
"github.com/containous/traefik/provider/k8s"
"github.com/containous/traefik/provider/kubernetes"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/containous/traefik/version"
@ -104,7 +104,7 @@ Complete documentation is available at https://traefik.io`,
f.AddParser(reflect.TypeOf(EntryPoints{}), &EntryPoints{})
f.AddParser(reflect.TypeOf(DefaultEntryPoints{}), &DefaultEntryPoints{})
f.AddParser(reflect.TypeOf(types.Constraints{}), &types.Constraints{})
f.AddParser(reflect.TypeOf(k8s.Namespaces{}), &k8s.Namespaces{})
f.AddParser(reflect.TypeOf(kubernetes.Namespaces{}), &kubernetes.Namespaces{})
f.AddParser(reflect.TypeOf([]acme.Domain{}), &acme.Domains{})
f.AddParser(reflect.TypeOf(types.Buckets{}), &types.Buckets{})