2016-02-02 18:03:40 +01:00
|
|
|
package provider
|
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
"strings"
|
|
|
|
"text/template"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
log "github.com/Sirupsen/logrus"
|
|
|
|
"github.com/cenkalti/backoff"
|
2016-02-24 16:43:39 +01:00
|
|
|
"github.com/containous/traefik/types"
|
2016-02-02 18:03:40 +01:00
|
|
|
"github.com/hashicorp/consul/api"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
// DefaultWatchWaitTime is the duration to wait when polling consul
|
|
|
|
DefaultWatchWaitTime = 15 * time.Second
|
|
|
|
)
|
|
|
|
|
|
|
|
// ConsulCatalog holds configurations of the Consul catalog provider.
|
|
|
|
type ConsulCatalog struct {
|
|
|
|
BaseProvider `mapstructure:",squash"`
|
|
|
|
Endpoint string
|
|
|
|
Domain string
|
|
|
|
client *api.Client
|
|
|
|
}
|
|
|
|
|
|
|
|
type catalogUpdate struct {
|
|
|
|
Service string
|
|
|
|
Nodes []*api.ServiceEntry
|
|
|
|
}
|
|
|
|
|
|
|
|
func (provider *ConsulCatalog) watchServices(stopCh <-chan struct{}) <-chan map[string][]string {
|
|
|
|
watchCh := make(chan map[string][]string)
|
|
|
|
|
|
|
|
catalog := provider.client.Catalog()
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
defer close(watchCh)
|
|
|
|
|
|
|
|
opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-stopCh:
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
data, meta, err := catalog.Services(opts)
|
|
|
|
if err != nil {
|
|
|
|
log.WithError(err).Errorf("Failed to list services")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// If LastIndex didn't change then it means `Get` returned
|
|
|
|
// because of the WaitTime and the key didn't changed.
|
|
|
|
if opts.WaitIndex == meta.LastIndex {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
opts.WaitIndex = meta.LastIndex
|
|
|
|
|
|
|
|
if data != nil {
|
|
|
|
watchCh <- data
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
return watchCh
|
|
|
|
}
|
|
|
|
|
|
|
|
func (provider *ConsulCatalog) healthyNodes(service string) (catalogUpdate, error) {
|
|
|
|
health := provider.client.Health()
|
|
|
|
opts := &api.QueryOptions{}
|
|
|
|
data, _, err := health.Service(service, "", true, opts)
|
|
|
|
if err != nil {
|
|
|
|
log.WithError(err).Errorf("Failed to fetch details of " + service)
|
|
|
|
return catalogUpdate{}, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return catalogUpdate{
|
|
|
|
Service: service,
|
|
|
|
Nodes: data,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (provider *ConsulCatalog) getBackend(node *api.ServiceEntry) string {
|
|
|
|
return strings.ToLower(node.Service.Service)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (provider *ConsulCatalog) getFrontendValue(service string) string {
|
|
|
|
return service + "." + provider.Domain
|
|
|
|
}
|
|
|
|
|
|
|
|
func (provider *ConsulCatalog) buildConfig(catalog []catalogUpdate) *types.Configuration {
|
|
|
|
var FuncMap = template.FuncMap{
|
|
|
|
"getBackend": provider.getBackend,
|
|
|
|
"getFrontendValue": provider.getFrontendValue,
|
|
|
|
"replace": replace,
|
|
|
|
}
|
|
|
|
|
|
|
|
allNodes := []*api.ServiceEntry{}
|
|
|
|
serviceNames := []string{}
|
|
|
|
for _, info := range catalog {
|
|
|
|
if len(info.Nodes) > 0 {
|
|
|
|
serviceNames = append(serviceNames, info.Service)
|
|
|
|
allNodes = append(allNodes, info.Nodes...)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
templateObjects := struct {
|
|
|
|
Services []string
|
|
|
|
Nodes []*api.ServiceEntry
|
|
|
|
}{
|
|
|
|
Services: serviceNames,
|
|
|
|
Nodes: allNodes,
|
|
|
|
}
|
|
|
|
|
|
|
|
configuration, err := provider.getConfiguration("templates/consul_catalog.tmpl", FuncMap, templateObjects)
|
|
|
|
if err != nil {
|
|
|
|
log.WithError(err).Error("Failed to create config")
|
|
|
|
}
|
|
|
|
|
|
|
|
return configuration
|
|
|
|
}
|
|
|
|
|
|
|
|
func (provider *ConsulCatalog) getNodes(index map[string][]string) ([]catalogUpdate, error) {
|
|
|
|
visited := make(map[string]bool)
|
|
|
|
|
|
|
|
nodes := []catalogUpdate{}
|
|
|
|
for service := range index {
|
|
|
|
name := strings.ToLower(service)
|
|
|
|
if !strings.Contains(name, " ") && !visited[name] {
|
|
|
|
visited[name] = true
|
|
|
|
log.WithFields(log.Fields{
|
|
|
|
"service": name,
|
|
|
|
}).Debug("Fetching service")
|
|
|
|
healthy, err := provider.healthyNodes(name)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
nodes = append(nodes, healthy)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nodes, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (provider *ConsulCatalog) watch(configurationChan chan<- types.ConfigMessage) error {
|
|
|
|
stopCh := make(chan struct{})
|
|
|
|
serviceCatalog := provider.watchServices(stopCh)
|
|
|
|
|
|
|
|
defer close(stopCh)
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case index, ok := <-serviceCatalog:
|
|
|
|
if !ok {
|
|
|
|
return errors.New("Consul service list nil")
|
|
|
|
}
|
|
|
|
log.Debug("List of services changed")
|
|
|
|
nodes, err := provider.getNodes(index)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
configuration := provider.buildConfig(nodes)
|
|
|
|
configurationChan <- types.ConfigMessage{
|
|
|
|
ProviderName: "consul_catalog",
|
|
|
|
Configuration: configuration,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Provide allows the provider to provide configurations to traefik
|
|
|
|
// using the given configuration channel.
|
|
|
|
func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMessage) error {
|
|
|
|
config := api.DefaultConfig()
|
|
|
|
config.Address = provider.Endpoint
|
|
|
|
client, err := api.NewClient(config)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
provider.client = client
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
notify := func(err error, time time.Duration) {
|
|
|
|
log.Errorf("Consul connection error %+v, retrying in %s", err, time)
|
|
|
|
}
|
|
|
|
worker := func() error {
|
|
|
|
return provider.watch(configurationChan)
|
|
|
|
}
|
|
|
|
err := backoff.RetryNotify(worker, backoff.NewExponentialBackOff(), notify)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatalf("Cannot connect to consul server %+v", err)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|