traefik/provider/consul/consul_catalog.go

424 lines
12 KiB
Go
Raw Normal View History

package consul
2016-02-02 18:03:40 +01:00
import (
"bytes"
2016-02-02 18:03:40 +01:00
"errors"
"sort"
"strconv"
2016-02-02 18:03:40 +01:00
"strings"
"text/template"
"time"
"github.com/BurntSushi/ty/fun"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
2016-02-02 18:03:40 +01:00
"github.com/hashicorp/consul/api"
)
const (
// DefaultWatchWaitTime is the duration to wait when polling consul
DefaultWatchWaitTime = 15 * time.Second
)
var _ provider.Provider = (*CatalogProvider)(nil)
// CatalogProvider holds configurations of the Consul catalog provider.
type CatalogProvider struct {
provider.BaseProvider `mapstructure:",squash"`
Endpoint string `description:"Consul server endpoint"`
Domain string `description:"Default domain used"`
Prefix string `description:"Prefix used for Consul catalog tags"`
FrontEndRule string `description:"Frontend rule used for Consul services"`
client *api.Client
frontEndRuleTemplate *template.Template
}
type serviceUpdate struct {
ServiceName string
Attributes []string
2016-02-02 18:03:40 +01:00
}
type catalogUpdate struct {
Service *serviceUpdate
2016-02-02 18:03:40 +01:00
Nodes []*api.ServiceEntry
}
type nodeSorter []*api.ServiceEntry
func (a nodeSorter) Len() int {
return len(a)
}
func (a nodeSorter) Swap(i int, j int) {
a[i], a[j] = a[j], a[i]
}
func (a nodeSorter) Less(i int, j int) bool {
lentr := a[i]
rentr := a[j]
ls := strings.ToLower(lentr.Service.Service)
lr := strings.ToLower(rentr.Service.Service)
if ls != lr {
return ls < lr
}
if lentr.Service.Address != rentr.Service.Address {
return lentr.Service.Address < rentr.Service.Address
}
if lentr.Node.Address != rentr.Node.Address {
return lentr.Node.Address < rentr.Node.Address
}
return lentr.Service.Port < rentr.Service.Port
}
func (p *CatalogProvider) watchServices(stopCh <-chan struct{}) <-chan map[string][]string {
2016-02-02 18:03:40 +01:00
watchCh := make(chan map[string][]string)
catalog := p.client.Catalog()
health := p.client.Health()
var lastHealthIndex uint64
2016-02-02 18:03:40 +01:00
safe.Go(func() {
2016-02-02 18:03:40 +01:00
defer close(watchCh)
catalogOptions := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
healthOptions := &api.QueryOptions{}
2016-02-02 18:03:40 +01:00
for {
select {
case <-stopCh:
return
default:
}
data, catalogMeta, err := catalog.Services(catalogOptions)
2016-02-02 18:03:40 +01:00
if err != nil {
log.WithError(err).Error("Failed to list services")
2016-02-02 18:03:40 +01:00
return
}
// Listening to changes that leads to `passing` state or degrades from it.
// The call is used just as a trigger for further actions
// (intentionally there is no interest in the received data).
_, healthMeta, err := health.State("passing", healthOptions)
if err != nil {
log.WithError(err).Error("Failed to retrieve health checks")
return
}
2016-02-02 18:03:40 +01:00
// If LastIndex didn't change then it means `Get` returned
// because of the WaitTime and the key didn't changed.
sameServiceAmount := catalogOptions.WaitIndex == catalogMeta.LastIndex
sameServiceHealth := lastHealthIndex == healthMeta.LastIndex
if sameServiceAmount && sameServiceHealth {
2016-02-02 18:03:40 +01:00
continue
}
catalogOptions.WaitIndex = catalogMeta.LastIndex
lastHealthIndex = healthMeta.LastIndex
2016-02-02 18:03:40 +01:00
if data != nil {
watchCh <- data
}
}
})
2016-02-02 18:03:40 +01:00
return watchCh
}
func (p *CatalogProvider) healthyNodes(service string) (catalogUpdate, error) {
health := p.client.Health()
2016-02-02 18:03:40 +01:00
opts := &api.QueryOptions{}
data, _, err := health.Service(service, "", true, opts)
if err != nil {
log.WithError(err).Errorf("Failed to fetch details of %s", service)
2016-02-02 18:03:40 +01:00
return catalogUpdate{}, err
}
nodes := fun.Filter(func(node *api.ServiceEntry) bool {
constraintTags := p.getConstraintTags(node.Service.Tags)
ok, failingConstraint := p.MatchConstraints(constraintTags)
if !ok && failingConstraint != nil {
log.Debugf("Service %v pruned by '%v' constraint", service, failingConstraint.String())
}
return ok
}, data).([]*api.ServiceEntry)
//Merge tags of nodes matching constraints, in a single slice.
tags := fun.Foldl(func(node *api.ServiceEntry, set []string) []string {
return fun.Keys(fun.Union(
fun.Set(set),
fun.Set(node.Service.Tags),
).(map[string]bool)).([]string)
}, []string{}, nodes).([]string)
2016-02-02 18:03:40 +01:00
return catalogUpdate{
Service: &serviceUpdate{
ServiceName: service,
Attributes: tags,
},
Nodes: nodes,
2016-02-02 18:03:40 +01:00
}, nil
}
func (p *CatalogProvider) getPrefixedName(name string) string {
if len(p.Prefix) > 0 {
return p.Prefix + "." + name
}
return name
}
func (p *CatalogProvider) getEntryPoints(list string) []string {
return strings.Split(list, ",")
}
func (p *CatalogProvider) getBackend(node *api.ServiceEntry) string {
2016-02-02 18:03:40 +01:00
return strings.ToLower(node.Service.Service)
}
func (p *CatalogProvider) getFrontendRule(service serviceUpdate) string {
customFrontendRule := p.getAttribute("frontend.rule", service.Attributes, "")
if customFrontendRule == "" {
customFrontendRule = p.FrontEndRule
}
t := p.frontEndRuleTemplate
t, err := t.Parse(customFrontendRule)
if err != nil {
log.Errorf("failed to parse Consul Catalog custom frontend rule: %s", err)
return ""
}
templateObjects := struct {
ServiceName string
Domain string
Attributes []string
}{
ServiceName: service.ServiceName,
Domain: p.Domain,
Attributes: service.Attributes,
}
var buffer bytes.Buffer
err = t.Execute(&buffer, templateObjects)
if err != nil {
log.Errorf("failed to execute Consul Catalog custom frontend rule template: %s", err)
return ""
}
return buffer.String()
}
func (p *CatalogProvider) getBackendAddress(node *api.ServiceEntry) string {
if node.Service.Address != "" {
return node.Service.Address
}
return node.Node.Address
}
func (p *CatalogProvider) getBackendName(node *api.ServiceEntry, index int) string {
serviceName := strings.ToLower(node.Service.Service) + "--" + node.Service.Address + "--" + strconv.Itoa(node.Service.Port)
for _, tag := range node.Service.Tags {
serviceName += "--" + provider.Normalize(tag)
}
serviceName = strings.Replace(serviceName, ".", "-", -1)
serviceName = strings.Replace(serviceName, "=", "-", -1)
// unique int at the end
serviceName += "--" + strconv.Itoa(index)
return serviceName
}
func (p *CatalogProvider) getAttribute(name string, tags []string, defaultValue string) string {
return p.getTag(p.getPrefixedName(name), tags, defaultValue)
}
func (p *CatalogProvider) hasTag(name string, tags []string) bool {
// Very-very unlikely that a Consul tag would ever start with '=!='
tag := p.getTag(name, tags, "=!=")
return tag != "=!="
}
func (p *CatalogProvider) getTag(name string, tags []string, defaultValue string) string {
for _, tag := range tags {
// Given the nature of Consul tags, which could be either singular markers, or key=value pairs, we check if the consul tag starts with 'name'
if strings.Index(strings.ToLower(tag), strings.ToLower(name)) == 0 {
// In case, where a tag might be a key=value, try to split it by the first '='
// - If the first element (which would always be there, even if the tag is a singular marker without '=' in it
if kv := strings.SplitN(tag, "=", 2); strings.ToLower(kv[0]) == strings.ToLower(name) {
// If the returned result is a key=value pair, return the 'value' component
if len(kv) == 2 {
return kv[1]
}
// If the returned result is a singular marker, return the 'key' component
return kv[0]
}
}
}
return defaultValue
2016-02-02 18:03:40 +01:00
}
func (p *CatalogProvider) getConstraintTags(tags []string) []string {
var list []string
for _, tag := range tags {
// If 'AllTagsConstraintFiltering' is disabled, we look for a Consul tag named 'traefik.tags' (unless different 'prefix' is configured)
if strings.Index(strings.ToLower(tag), p.getPrefixedName("tags=")) == 0 {
// If 'traefik.tags=' tag is found, take the tag value and split by ',' adding the result to the list to be returned
splitedTags := strings.Split(tag[len(p.getPrefixedName("tags=")):], ",")
list = append(list, splitedTags...)
}
}
return list
}
func (p *CatalogProvider) buildConfig(catalog []catalogUpdate) *types.Configuration {
2016-02-02 18:03:40 +01:00
var FuncMap = template.FuncMap{
"getBackend": p.getBackend,
"getFrontendRule": p.getFrontendRule,
"getBackendName": p.getBackendName,
"getBackendAddress": p.getBackendAddress,
"getAttribute": p.getAttribute,
"getTag": p.getTag,
"hasTag": p.hasTag,
"getEntryPoints": p.getEntryPoints,
"hasMaxconnAttributes": p.hasMaxconnAttributes,
2016-02-02 18:03:40 +01:00
}
allNodes := []*api.ServiceEntry{}
services := []*serviceUpdate{}
2016-02-02 18:03:40 +01:00
for _, info := range catalog {
for _, node := range info.Nodes {
isEnabled := p.getAttribute("enable", node.Service.Tags, "true")
if isEnabled != "false" && len(info.Nodes) > 0 {
services = append(services, info.Service)
allNodes = append(allNodes, info.Nodes...)
break
}
2016-02-02 18:03:40 +01:00
}
}
// Ensure a stable ordering of nodes so that identical configurations may be detected
sort.Sort(nodeSorter(allNodes))
2016-02-02 18:03:40 +01:00
templateObjects := struct {
Services []*serviceUpdate
2016-02-02 18:03:40 +01:00
Nodes []*api.ServiceEntry
}{
Services: services,
2016-02-02 18:03:40 +01:00
Nodes: allNodes,
}
configuration, err := p.GetConfiguration("templates/consul_catalog.tmpl", FuncMap, templateObjects)
2016-02-02 18:03:40 +01:00
if err != nil {
log.WithError(err).Error("Failed to create config")
}
return configuration
}
func (p *CatalogProvider) hasMaxconnAttributes(attributes []string) bool {
amount := p.getAttribute("backend.maxconn.amount", attributes, "")
extractorfunc := p.getAttribute("backend.maxconn.extractorfunc", attributes, "")
2016-08-24 23:46:47 -04:00
if amount != "" && extractorfunc != "" {
return true
}
return false
}
func (p *CatalogProvider) getNodes(index map[string][]string) ([]catalogUpdate, error) {
2016-02-02 18:03:40 +01:00
visited := make(map[string]bool)
nodes := []catalogUpdate{}
for service := range index {
name := strings.ToLower(service)
if !strings.Contains(name, " ") && !visited[name] {
visited[name] = true
log.WithField("service", name).Debug("Fetching service")
healthy, err := p.healthyNodes(name)
2016-02-02 18:03:40 +01:00
if err != nil {
return nil, err
}
// healthy.Nodes can be empty if constraints do not match, without throwing error
if healthy.Service != nil && len(healthy.Nodes) > 0 {
nodes = append(nodes, healthy)
}
2016-02-02 18:03:40 +01:00
}
}
return nodes, nil
}
func (p *CatalogProvider) watch(configurationChan chan<- types.ConfigMessage, stop chan bool) error {
2016-02-02 18:03:40 +01:00
stopCh := make(chan struct{})
serviceCatalog := p.watchServices(stopCh)
2016-02-02 18:03:40 +01:00
defer close(stopCh)
for {
select {
case <-stop:
return nil
2016-02-02 18:03:40 +01:00
case index, ok := <-serviceCatalog:
if !ok {
return errors.New("Consul service list nil")
}
log.Debug("List of services changed")
nodes, err := p.getNodes(index)
2016-02-02 18:03:40 +01:00
if err != nil {
return err
}
configuration := p.buildConfig(nodes)
2016-02-02 18:03:40 +01:00
configurationChan <- types.ConfigMessage{
ProviderName: "consul_catalog",
Configuration: configuration,
}
}
}
}
func (p *CatalogProvider) setupFrontEndTemplate() {
var FuncMap = template.FuncMap{
"getAttribute": p.getAttribute,
"getTag": p.getTag,
"hasTag": p.hasTag,
}
t := template.New("consul catalog frontend rule").Funcs(FuncMap)
p.frontEndRuleTemplate = t
}
// Provide allows the consul catalog provider to provide configurations to traefik
2016-02-02 18:03:40 +01:00
// using the given configuration channel.
func (p *CatalogProvider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
2016-02-02 18:03:40 +01:00
config := api.DefaultConfig()
config.Address = p.Endpoint
2016-02-02 18:03:40 +01:00
client, err := api.NewClient(config)
if err != nil {
return err
}
p.client = client
p.Constraints = append(p.Constraints, constraints...)
p.setupFrontEndTemplate()
2016-02-02 18:03:40 +01:00
pool.Go(func(stop chan bool) {
2016-02-02 18:03:40 +01:00
notify := func(err error, time time.Duration) {
log.Errorf("Consul connection error %+v, retrying in %s", err, time)
}
2016-08-19 14:24:09 +02:00
operation := func() error {
return p.watch(configurationChan, stop)
2016-02-02 18:03:40 +01:00
}
2016-12-08 13:32:12 +01:00
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
2016-02-02 18:03:40 +01:00
if err != nil {
2016-08-19 10:36:54 +02:00
log.Errorf("Cannot connect to consul server %+v", err)
2016-02-02 18:03:40 +01:00
}
})
2016-02-02 18:03:40 +01:00
return err
}