Merge pull request #183 from keis/consul-catalog

WIP consul catalog provider
This commit is contained in:
Vincent Demeester 2016-02-24 09:35:26 +01:00
commit 9964654495
14 changed files with 552 additions and 21 deletions

39
cmd.go
View file

@ -39,28 +39,30 @@ var versionCmd = &cobra.Command{
var arguments = struct { var arguments = struct {
GlobalConfiguration GlobalConfiguration
web bool web bool
file bool file bool
docker bool docker bool
dockerTLS bool dockerTLS bool
marathon bool marathon bool
consul bool consul bool
zookeeper bool consulCatalog bool
etcd bool zookeeper bool
boltdb bool etcd bool
boltdb bool
}{ }{
GlobalConfiguration{ GlobalConfiguration{
EntryPoints: make(EntryPoints), EntryPoints: make(EntryPoints),
Docker: &provider.Docker{ Docker: &provider.Docker{
TLS: &provider.DockerTLS{}, TLS: &provider.DockerTLS{},
}, },
File: &provider.File{}, File: &provider.File{},
Web: &WebProvider{}, Web: &WebProvider{},
Marathon: &provider.Marathon{}, Marathon: &provider.Marathon{},
Consul: &provider.Consul{}, Consul: &provider.Consul{},
Zookeeper: &provider.Zookepper{}, ConsulCatalog: &provider.ConsulCatalog{},
Etcd: &provider.Etcd{}, Zookeeper: &provider.Zookepper{},
Boltdb: &provider.BoltDb{}, Etcd: &provider.Etcd{},
Boltdb: &provider.BoltDb{},
}, },
false, false,
false, false,
@ -71,6 +73,7 @@ var arguments = struct {
false, false,
false, false,
false, false,
false,
} }
func init() { func init() {
@ -119,6 +122,10 @@ func init() {
traefikCmd.PersistentFlags().StringVar(&arguments.Consul.Endpoint, "consul.endpoint", "127.0.0.1:8500", "Consul server endpoint") traefikCmd.PersistentFlags().StringVar(&arguments.Consul.Endpoint, "consul.endpoint", "127.0.0.1:8500", "Consul server endpoint")
traefikCmd.PersistentFlags().StringVar(&arguments.Consul.Prefix, "consul.prefix", "/traefik", "Prefix used for KV store") traefikCmd.PersistentFlags().StringVar(&arguments.Consul.Prefix, "consul.prefix", "/traefik", "Prefix used for KV store")
traefikCmd.PersistentFlags().BoolVar(&arguments.consulCatalog, "consulCatalog", false, "Enable Consul catalog backend")
traefikCmd.PersistentFlags().StringVar(&arguments.ConsulCatalog.Domain, "consulCatalog.domain", "", "Default domain used")
traefikCmd.PersistentFlags().StringVar(&arguments.ConsulCatalog.Endpoint, "consulCatalog.endpoint", "127.0.0.1:8500", "Consul server endpoint")
traefikCmd.PersistentFlags().BoolVar(&arguments.zookeeper, "zookeeper", false, "Enable Zookeeper backend") traefikCmd.PersistentFlags().BoolVar(&arguments.zookeeper, "zookeeper", false, "Enable Zookeeper backend")
traefikCmd.PersistentFlags().BoolVar(&arguments.Zookeeper.Watch, "zookeeper.watch", true, "Watch provider") traefikCmd.PersistentFlags().BoolVar(&arguments.Zookeeper.Watch, "zookeeper.watch", true, "Watch provider")
traefikCmd.PersistentFlags().StringVar(&arguments.Zookeeper.Filename, "zookeeper.filename", "", "Override default configuration template. For advanced users :)") traefikCmd.PersistentFlags().StringVar(&arguments.Zookeeper.Filename, "zookeeper.filename", "", "Override default configuration template. For advanced users :)")

View file

@ -30,6 +30,7 @@ type GlobalConfiguration struct {
Web *WebProvider Web *WebProvider
Marathon *provider.Marathon Marathon *provider.Marathon
Consul *provider.Consul Consul *provider.Consul
ConsulCatalog *provider.ConsulCatalog
Etcd *provider.Etcd Etcd *provider.Etcd
Zookeeper *provider.Zookepper Zookeeper *provider.Zookepper
Boltdb *provider.BoltDb Boltdb *provider.BoltDb
@ -224,6 +225,9 @@ func LoadConfiguration() *GlobalConfiguration {
if arguments.consul { if arguments.consul {
viper.Set("consul", arguments.Consul) viper.Set("consul", arguments.Consul)
} }
if arguments.consulCatalog {
viper.Set("consulCatalog", arguments.ConsulCatalog)
}
if arguments.zookeeper { if arguments.zookeeper {
viper.Set("zookeeper", arguments.Zookeeper) viper.Set("zookeeper", arguments.Zookeeper)
} }

View file

@ -12,6 +12,7 @@ ___
- [Docker backend](#docker) - [Docker backend](#docker)
- [Mesos/Marathon backend](#marathon) - [Mesos/Marathon backend](#marathon)
- [Consul backend](#consul) - [Consul backend](#consul)
- [Consul catalog backend](#consulcatalog)
- [Etcd backend](#etcd) - [Etcd backend](#etcd)
- [Zookeeper backend](#zk) - [Zookeeper backend](#zk)
- [Boltdb backend](#boltdb) - [Boltdb backend](#boltdb)
@ -109,6 +110,9 @@ Flags:
--consul.filename string Override default configuration template. For advanced users :) --consul.filename string Override default configuration template. For advanced users :)
--consul.prefix string Prefix used for KV store (default "/traefik") --consul.prefix string Prefix used for KV store (default "/traefik")
--consul.watch Watch provider (default true) --consul.watch Watch provider (default true)
--consulCatalog Enable Consul catalog backend
--consulCatalog.domain string Default domain used
--consulCatalog.endpoint string Consul server endpoint (default "127.0.0.1:8500")
--defaultEntryPoints value Entrypoints to be used by frontends that do not specify any entrypoint (default &main.DefaultEntryPoints(nil)) --defaultEntryPoints value Entrypoints to be used by frontends that do not specify any entrypoint (default &main.DefaultEntryPoints(nil))
--docker Enable Docker backend --docker Enable Docker backend
--docker.domain string Default domain used --docker.domain string Default domain used
@ -842,6 +846,37 @@ The Keys-Values structure should look (using `prefix = "/traefik"`):
| `/traefik/frontends/frontend2/routes/test_2/value` | `/test` | | `/traefik/frontends/frontend2/routes/test_2/value` | `/test` |
## <a id="consulcatalog"></a> Consul catalog backend
Træfɪk can be configured to use service discovery catalog of Consul as a backend configuration:
```toml
################################################################
# Consul Catalog configuration backend
################################################################
# Enable Consul Catalog configuration backend
#
# Optional
#
[consulCatalog]
# Consul server endpoint
#
# Required
#
endpoint = "127.0.0.1:8500"
# Default domain used.
#
# Optional
#
domain = "consul.localhost"
```
This backend will create routes matching on hostname based on the service name
used in consul.
## <a id="zk"></a> Zookeeper backend ## <a id="zk"></a> Zookeeper backend
Træfɪk can be configured to use Zookeeper as a backend configuration: Træfɪk can be configured to use Zookeeper as a backend configuration:

View file

@ -124,7 +124,7 @@ import:
- package: gopkg.in/alecthomas/kingpin.v2 - package: gopkg.in/alecthomas/kingpin.v2
ref: 639879d6110b1b0409410c7b737ef0bb18325038 ref: 639879d6110b1b0409410c7b737ef0bb18325038
- package: github.com/docker/libcompose - package: github.com/docker/libcompose
ref: 79ef5d150f053a5b12f16b02d8844ed7cf33611a ref: d3089811c119a211469a9cc93caea684d937e5d3
subpackages: subpackages:
- docker - docker
- logger - logger

View file

@ -0,0 +1,132 @@
package main
import (
"fmt"
"io/ioutil"
"net/http"
"os"
"os/exec"
"time"
"github.com/docker/docker/opts"
"github.com/fsouza/go-dockerclient"
"github.com/hashicorp/consul/api"
checker "github.com/vdemeester/shakers"
check "gopkg.in/check.v1"
)
// Consul catalog test suites
type ConsulCatalogSuite struct {
BaseSuite
consulIP string
consulClient *api.Client
dockerClient *docker.Client
}
func (s *ConsulCatalogSuite) GetContainer(name string) (*docker.Container, error) {
return s.dockerClient.InspectContainer(name)
}
func (s *ConsulCatalogSuite) SetUpSuite(c *check.C) {
dockerHost := os.Getenv("DOCKER_HOST")
if dockerHost == "" {
// FIXME Handle windows -- see if dockerClient already handle that or not
dockerHost = fmt.Sprintf("unix://%s", opts.DefaultUnixSocket)
}
// Make sure we can speak to docker
dockerClient, err := docker.NewClient(dockerHost)
c.Assert(err, checker.IsNil, check.Commentf("Error connecting to docker daemon"))
s.dockerClient = dockerClient
s.createComposeProject(c, "consul_catalog")
err = s.composeProject.Up()
c.Assert(err, checker.IsNil, check.Commentf("Error starting project"))
consul, err := s.GetContainer("integration-test-consul_catalog_consul_1")
c.Assert(err, checker.IsNil, check.Commentf("Error finding consul container"))
s.consulIP = consul.NetworkSettings.IPAddress
config := api.DefaultConfig()
config.Address = s.consulIP + ":8500"
consulClient, err := api.NewClient(config)
if err != nil {
c.Fatalf("Error creating consul client")
}
s.consulClient = consulClient
// Wait for consul to elect itself leader
time.Sleep(2000 * time.Millisecond)
}
func (s *ConsulCatalogSuite) registerService(name string, address string, port int) error {
catalog := s.consulClient.Catalog()
_, err := catalog.Register(
&api.CatalogRegistration{
Node: address,
Address: address,
Service: &api.AgentService{
ID: name,
Service: name,
Address: address,
Port: port,
},
},
&api.WriteOptions{},
)
return err
}
func (s *ConsulCatalogSuite) deregisterService(name string, address string) error {
catalog := s.consulClient.Catalog()
_, err := catalog.Deregister(
&api.CatalogDeregistration{
Node: address,
Address: address,
ServiceID: name,
},
&api.WriteOptions{},
)
return err
}
func (s *ConsulCatalogSuite) TestSimpleConfiguration(c *check.C) {
cmd := exec.Command(traefikBinary, "--consulCatalog", "--consulCatalog.endpoint="+s.consulIP+":8500", "--configFile=fixtures/consul_catalog/simple.toml")
err := cmd.Start()
c.Assert(err, checker.IsNil)
defer cmd.Process.Kill()
time.Sleep(500 * time.Millisecond)
// TODO validate : run on 80
resp, err := http.Get("http://127.0.0.1:8000/")
// Expected a 404 as we did not configure anything
c.Assert(err, checker.IsNil)
c.Assert(resp.StatusCode, checker.Equals, 404)
}
func (s *ConsulCatalogSuite) TestSingleService(c *check.C) {
cmd := exec.Command(traefikBinary, "--consulCatalog", "--consulCatalog.endpoint="+s.consulIP+":8500", "--consulCatalog.domain=consul.localhost", "--configFile=fixtures/consul_catalog/simple.toml")
err := cmd.Start()
c.Assert(err, checker.IsNil)
defer cmd.Process.Kill()
nginx, err := s.GetContainer("integration-test-consul_catalog_nginx_1")
c.Assert(err, checker.IsNil, check.Commentf("Error finding nginx container"))
err = s.registerService("test", nginx.NetworkSettings.IPAddress, 80)
c.Assert(err, checker.IsNil, check.Commentf("Error registering service"))
defer s.deregisterService("test", nginx.NetworkSettings.IPAddress)
time.Sleep(5000 * time.Millisecond)
client := &http.Client{}
req, err := http.NewRequest("GET", "http://127.0.0.1:8000/", nil)
c.Assert(err, checker.IsNil)
req.Host = "test.consul.localhost"
resp, err := client.Do(req)
c.Assert(err, checker.IsNil)
c.Assert(resp.StatusCode, checker.Equals, 200)
_, err = ioutil.ReadAll(resp.Body)
c.Assert(err, checker.IsNil)
}

View file

@ -0,0 +1,9 @@
defaultEntryPoints = ["http"]
logLevel = "DEBUG"
[entryPoints]
[entryPoints.http]
address = ":8000"
[consulCatalog]
domain = "consul.localhost"

View file

@ -28,6 +28,7 @@ func init() {
check.Suite(&FileSuite{}) check.Suite(&FileSuite{})
check.Suite(&DockerSuite{}) check.Suite(&DockerSuite{})
check.Suite(&ConsulSuite{}) check.Suite(&ConsulSuite{})
check.Suite(&ConsulCatalogSuite{})
check.Suite(&MarathonSuite{}) check.Suite(&MarathonSuite{})
} }
@ -80,7 +81,9 @@ func (s *BaseSuite) TearDownSuite(c *check.C) {
func (s *BaseSuite) createComposeProject(c *check.C, name string) { func (s *BaseSuite) createComposeProject(c *check.C, name string) {
composeProject, err := docker.NewProject(&docker.Context{ composeProject, err := docker.NewProject(&docker.Context{
Context: project.Context{ Context: project.Context{
ComposeFile: fmt.Sprintf("resources/compose/%s.yml", name), ComposeFiles: []string{
fmt.Sprintf("resources/compose/%s.yml", name),
},
ProjectName: fmt.Sprintf("integration-test-%s", name), ProjectName: fmt.Sprintf("integration-test-%s", name),
}, },
}) })

View file

@ -0,0 +1,17 @@
consul:
image: progrium/consul
command: -server -bootstrap -log-level debug -ui-dir /ui
ports:
- "8400:8400"
- "8500:8500"
- "8600:53/udp"
expose:
- "8300"
- "8301"
- "8301/udp"
- "8302"
- "8302/udp"
nginx:
image: nginx
ports:
- "8881:80"

View file

@ -3,7 +3,7 @@ zk:
net: host net: host
environment: environment:
ZK_CONFIG: tickTime=2000,initLimit=10,syncLimit=5,maxClientCnxns=128,forceSync=no,clientPort=2181 ZK_CONFIG: tickTime=2000,initLimit=10,syncLimit=5,maxClientCnxns=128,forceSync=no,clientPort=2181
ZK_ID: 1 ZK_ID: " 1"
master: master:
image: mesosphere/mesos-master:0.23.0-1.0.ubuntu1404 image: mesosphere/mesos-master:0.23.0-1.0.ubuntu1404
@ -12,7 +12,7 @@ master:
MESOS_ZK: zk://127.0.0.1:2181/mesos MESOS_ZK: zk://127.0.0.1:2181/mesos
MESOS_HOSTNAME: 127.0.0.1 MESOS_HOSTNAME: 127.0.0.1
MESOS_IP: 127.0.0.1 MESOS_IP: 127.0.0.1
MESOS_QUORUM: 1 MESOS_QUORUM: " 1"
MESOS_CLUSTER: docker-compose MESOS_CLUSTER: docker-compose
MESOS_WORK_DIR: /var/lib/mesos MESOS_WORK_DIR: /var/lib/mesos

199
provider/consul_catalog.go Normal file
View file

@ -0,0 +1,199 @@
package provider
import (
"errors"
"strings"
"text/template"
"time"
log "github.com/Sirupsen/logrus"
"github.com/cenkalti/backoff"
"github.com/emilevauge/traefik/types"
"github.com/hashicorp/consul/api"
)
const (
// DefaultWatchWaitTime is the duration to wait when polling consul
DefaultWatchWaitTime = 15 * time.Second
)
// ConsulCatalog holds configurations of the Consul catalog provider.
type ConsulCatalog struct {
BaseProvider `mapstructure:",squash"`
Endpoint string
Domain string
client *api.Client
}
type catalogUpdate struct {
Service string
Nodes []*api.ServiceEntry
}
func (provider *ConsulCatalog) watchServices(stopCh <-chan struct{}) <-chan map[string][]string {
watchCh := make(chan map[string][]string)
catalog := provider.client.Catalog()
go func() {
defer close(watchCh)
opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
for {
select {
case <-stopCh:
return
default:
}
data, meta, err := catalog.Services(opts)
if err != nil {
log.WithError(err).Errorf("Failed to list services")
return
}
// If LastIndex didn't change then it means `Get` returned
// because of the WaitTime and the key didn't changed.
if opts.WaitIndex == meta.LastIndex {
continue
}
opts.WaitIndex = meta.LastIndex
if data != nil {
watchCh <- data
}
}
}()
return watchCh
}
func (provider *ConsulCatalog) healthyNodes(service string) (catalogUpdate, error) {
health := provider.client.Health()
opts := &api.QueryOptions{}
data, _, err := health.Service(service, "", true, opts)
if err != nil {
log.WithError(err).Errorf("Failed to fetch details of " + service)
return catalogUpdate{}, err
}
return catalogUpdate{
Service: service,
Nodes: data,
}, nil
}
func (provider *ConsulCatalog) getBackend(node *api.ServiceEntry) string {
return strings.ToLower(node.Service.Service)
}
func (provider *ConsulCatalog) getFrontendValue(service string) string {
return service + "." + provider.Domain
}
func (provider *ConsulCatalog) buildConfig(catalog []catalogUpdate) *types.Configuration {
var FuncMap = template.FuncMap{
"getBackend": provider.getBackend,
"getFrontendValue": provider.getFrontendValue,
"replace": replace,
}
allNodes := []*api.ServiceEntry{}
serviceNames := []string{}
for _, info := range catalog {
if len(info.Nodes) > 0 {
serviceNames = append(serviceNames, info.Service)
allNodes = append(allNodes, info.Nodes...)
}
}
templateObjects := struct {
Services []string
Nodes []*api.ServiceEntry
}{
Services: serviceNames,
Nodes: allNodes,
}
configuration, err := provider.getConfiguration("templates/consul_catalog.tmpl", FuncMap, templateObjects)
if err != nil {
log.WithError(err).Error("Failed to create config")
}
return configuration
}
func (provider *ConsulCatalog) getNodes(index map[string][]string) ([]catalogUpdate, error) {
visited := make(map[string]bool)
nodes := []catalogUpdate{}
for service := range index {
name := strings.ToLower(service)
if !strings.Contains(name, " ") && !visited[name] {
visited[name] = true
log.WithFields(log.Fields{
"service": name,
}).Debug("Fetching service")
healthy, err := provider.healthyNodes(name)
if err != nil {
return nil, err
}
nodes = append(nodes, healthy)
}
}
return nodes, nil
}
func (provider *ConsulCatalog) watch(configurationChan chan<- types.ConfigMessage) error {
stopCh := make(chan struct{})
serviceCatalog := provider.watchServices(stopCh)
defer close(stopCh)
for {
select {
case index, ok := <-serviceCatalog:
if !ok {
return errors.New("Consul service list nil")
}
log.Debug("List of services changed")
nodes, err := provider.getNodes(index)
if err != nil {
return err
}
configuration := provider.buildConfig(nodes)
configurationChan <- types.ConfigMessage{
ProviderName: "consul_catalog",
Configuration: configuration,
}
}
}
}
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMessage) error {
config := api.DefaultConfig()
config.Address = provider.Endpoint
client, err := api.NewClient(config)
if err != nil {
return err
}
provider.client = client
go func() {
notify := func(err error, time time.Duration) {
log.Errorf("Consul connection error %+v, retrying in %s", err, time)
}
worker := func() error {
return provider.watch(configurationChan)
}
err := backoff.RetryNotify(worker, backoff.NewExponentialBackOff(), notify)
if err != nil {
log.Fatalf("Cannot connect to consul server %+v", err)
}
}()
return err
}

View file

@ -0,0 +1,110 @@
package provider
import (
"reflect"
"testing"
"github.com/emilevauge/traefik/types"
"github.com/hashicorp/consul/api"
)
func TestConsulCatalogGetFrontendValue(t *testing.T) {
provider := &ConsulCatalog{
Domain: "localhost",
}
services := []struct {
service string
expected string
}{
{
service: "foo",
expected: "foo.localhost",
},
}
for _, e := range services {
actual := provider.getFrontendValue(e.service)
if actual != e.expected {
t.Fatalf("expected %q, got %q", e.expected, actual)
}
}
}
func TestConsulCatalogBuildConfig(t *testing.T) {
provider := &ConsulCatalog{
Domain: "localhost",
}
cases := []struct {
nodes []catalogUpdate
expectedFrontends map[string]*types.Frontend
expectedBackends map[string]*types.Backend
}{
{
nodes: []catalogUpdate{},
expectedFrontends: map[string]*types.Frontend{},
expectedBackends: map[string]*types.Backend{},
},
{
nodes: []catalogUpdate{
{
Service: "test",
},
},
expectedFrontends: map[string]*types.Frontend{},
expectedBackends: map[string]*types.Backend{},
},
{
nodes: []catalogUpdate{
{
Service: "test",
Nodes: []*api.ServiceEntry{
{
Service: &api.AgentService{
Service: "test",
Port: 80,
},
Node: &api.Node{
Node: "localhost",
Address: "127.0.0.1",
},
},
},
},
},
expectedFrontends: map[string]*types.Frontend{
"frontend-test": {
Backend: "backend-test",
Routes: map[string]types.Route{
"route-host-test": {
Rule: "Host",
Value: "test.localhost",
},
},
},
},
expectedBackends: map[string]*types.Backend{
"backend-test": {
Servers: map[string]types.Server{
"server-localhost-80": {
URL: "http://127.0.0.1:80",
},
},
CircuitBreaker: nil,
LoadBalancer: nil,
},
},
},
}
for _, c := range cases {
actualConfig := provider.buildConfig(c.nodes)
if !reflect.DeepEqual(actualConfig.Backends, c.expectedBackends) {
t.Fatalf("expected %#v, got %#v", c.expectedBackends, actualConfig.Backends)
}
if !reflect.DeepEqual(actualConfig.Frontends, c.expectedFrontends) {
t.Fatalf("expected %#v, got %#v", c.expectedFrontends, actualConfig.Frontends)
}
}
}

View file

@ -184,6 +184,9 @@ func (server *Server) configureProviders() {
if server.globalConfiguration.Consul != nil { if server.globalConfiguration.Consul != nil {
server.providers = append(server.providers, server.globalConfiguration.Consul) server.providers = append(server.providers, server.globalConfiguration.Consul)
} }
if server.globalConfiguration.ConsulCatalog != nil {
server.providers = append(server.providers, server.globalConfiguration.ConsulCatalog)
}
if server.globalConfiguration.Etcd != nil { if server.globalConfiguration.Etcd != nil {
server.providers = append(server.providers, server.globalConfiguration.Etcd) server.providers = append(server.providers, server.globalConfiguration.Etcd)
} }

View file

@ -0,0 +1,13 @@
[backends]{{range .Nodes}}
[backends.backend-{{getBackend .}}.servers.server-{{.Node.Node | replace "." "-"}}-{{.Service.Port}}]
url = "http://{{.Node.Address}}:{{.Service.Port}}"
{{end}}
[frontends]{{range .Services}}
[frontends.frontend-{{.}}]
backend = "backend-{{.}}"
passHostHeader = false
[frontends.frontend-{{.}}.routes.route-host-{{.}}]
rule = "Host"
value = "{{getFrontendValue .}}"
{{end}}

View file

@ -8,7 +8,6 @@ import (
func main() { func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
if err := traefikCmd.Execute(); err != nil { if err := traefikCmd.Execute(); err != nil {
fmtlog.Println(err) fmtlog.Println(err)
os.Exit(-1) os.Exit(-1)