Merge pull request #340 from containous/fix-etcd-backend

Fix etcd backend with prefix /
This commit is contained in:
Vincent Demeester 2016-05-03 16:16:10 +02:00
commit c1a12a58eb
13 changed files with 471 additions and 58 deletions

View file

@ -140,4 +140,4 @@ Founded in 2014, Asteris creates next-generation infrastructure software for the
## Credits ## Credits
Thanks you [Peka](http://peka.byethost11.com/photoblog/) for your awesome work on the logo ![logo](docs/img/traefik.icon.png) Kudos to [Peka](http://peka.byethost11.com/photoblog/) for his awesome work on the logo ![logo](docs/img/traefik.icon.png)

View file

@ -0,0 +1,4 @@
etcd:
image: gcr.io/google_containers/etcd:2.2.1
net: host
command: ['/usr/local/bin/etcd', '--addr=127.0.0.1:4001', '--bind-addr=0.0.0.0:4001', '--data-dir=/var/etcd/data']

25
examples/etcd-config.sh Executable file
View file

@ -0,0 +1,25 @@
#!/bin/sh
# backend 1
curl -i -H "Accept: application/json" -X PUT -d value="NetworkErrorRatio() > 0.5" http://localhost:4001/v2/keys/traefik/backends/backend1/circuitbreaker/expression
curl -i -H "Accept: application/json" -X PUT -d value="http://172.17.0.2:80" http://localhost:4001/v2/keys/traefik/backends/backend1/servers/server1/url
curl -i -H "Accept: application/json" -X PUT -d value="10" http://localhost:4001/v2/keys/traefik/backends/backend1/servers/server1/weight
curl -i -H "Accept: application/json" -X PUT -d value="http://172.17.0.3:80" http://localhost:4001/v2/keys/traefik/backends/backend1/servers/server2/url
curl -i -H "Accept: application/json" -X PUT -d value="1" http://localhost:4001/v2/keys/traefik/backends/backend1/servers/server2/weight
# backend 2
curl -i -H "Accept: application/json" -X PUT -d value="drr" http://localhost:4001/v2/keys/traefik/backends/backend2/loadbalancer/method
curl -i -H "Accept: application/json" -X PUT -d value="http://172.17.0.4:80" http://localhost:4001/v2/keys/traefik/backends/backend2/servers/server1/url
curl -i -H "Accept: application/json" -X PUT -d value="1" http://localhost:4001/v2/keys/traefik/backends/backend2/servers/server1/weight
curl -i -H "Accept: application/json" -X PUT -d value="http://172.17.0.5:80" http://localhost:4001/v2/keys/traefik/backends/backend2/servers/server2/url
curl -i -H "Accept: application/json" -X PUT -d value="2" http://localhost:4001/v2/keys/traefik/backends/backend2/servers/server2/weight
# frontend 1
curl -i -H "Accept: application/json" -X PUT -d value="backend2" http://localhost:4001/v2/keys/traefik/frontends/frontend1/backend
curl -i -H "Accept: application/json" -X PUT -d value="http" http://localhost:4001/v2/keys/traefik/frontends/frontend1/entrypoints
curl -i -H "Accept: application/json" -X PUT -d value="Host:test.localhost" http://localhost:4001/v2/keys/traefik/frontends/frontend1/routes/test_1/rule
# frontend 2
curl -i -H "Accept: application/json" -X PUT -d value="backend1" http://localhost:4001/v2/keys/traefik/frontends/frontend2/backend
curl -i -H "Accept: application/json" -X PUT -d value="http" http://localhost:4001/v2/keys/traefik/frontends/frontend2/entrypoints
curl -i -H "Accept: application/json" -X PUT -d value="Path:/test" http://localhost:4001/v2/keys/traefik/frontends/frontend2/routes/test_2/rule

View file

@ -5,29 +5,186 @@ import (
"os/exec" "os/exec"
"time" "time"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
"github.com/go-check/check" "github.com/go-check/check"
"errors"
"github.com/containous/traefik/integration/utils"
checker "github.com/vdemeester/shakers" checker "github.com/vdemeester/shakers"
"io/ioutil"
"os"
"strings"
) )
// Consul test suites (using libcompose) // Consul test suites (using libcompose)
type ConsulSuite struct{ BaseSuite } type ConsulSuite struct {
BaseSuite
kv store.Store
}
func (s *ConsulSuite) SetUpSuite(c *check.C) { func (s *ConsulSuite) SetUpSuite(c *check.C) {
s.createComposeProject(c, "consul") s.createComposeProject(c, "consul")
s.composeProject.Start(c)
consul.Register()
kv, err := libkv.NewStore(
store.CONSUL,
[]string{s.composeProject.Container(c, "consul").NetworkSettings.IPAddress + ":8500"},
&store.Config{
ConnectionTimeout: 10 * time.Second,
},
)
if err != nil {
c.Fatal("Cannot create store consul")
}
s.kv = kv
// wait for consul
err = utils.Try(60*time.Second, func() error {
_, err := kv.Exists("test")
if err != nil {
return err
}
return nil
})
c.Assert(err, checker.IsNil)
} }
func (s *ConsulSuite) TestSimpleConfiguration(c *check.C) { func (s *ConsulSuite) TestSimpleConfiguration(c *check.C) {
cmd := exec.Command(traefikBinary, "--configFile=fixtures/consul/simple.toml") consulHost := s.composeProject.Container(c, "consul").NetworkSettings.IPAddress
file := s.adaptFile(c, "fixtures/consul/simple.toml", struct{ ConsulHost string }{consulHost})
defer os.Remove(file)
cmd := exec.Command(traefikBinary, "--configFile="+file)
err := cmd.Start() err := cmd.Start()
c.Assert(err, checker.IsNil) c.Assert(err, checker.IsNil)
defer cmd.Process.Kill() defer cmd.Process.Kill()
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
// TODO validate : run on 80
resp, err := http.Get("http://127.0.0.1:8000/") resp, err := http.Get("http://127.0.0.1:8000/")
// Expected a 404 as we did not configure anything // Expected a 404 as we did not configure anything
c.Assert(err, checker.IsNil) c.Assert(err, checker.IsNil)
c.Assert(resp.StatusCode, checker.Equals, 404) c.Assert(resp.StatusCode, checker.Equals, 404)
} }
func (s *ConsulSuite) TestNominalConfiguration(c *check.C) {
consulHost := s.composeProject.Container(c, "consul").NetworkSettings.IPAddress
file := s.adaptFile(c, "fixtures/consul/simple.toml", struct{ ConsulHost string }{consulHost})
defer os.Remove(file)
cmd := exec.Command(traefikBinary, "--configFile="+file)
err := cmd.Start()
c.Assert(err, checker.IsNil)
defer cmd.Process.Kill()
whoami1 := s.composeProject.Container(c, "whoami1")
whoami2 := s.composeProject.Container(c, "whoami2")
whoami3 := s.composeProject.Container(c, "whoami3")
whoami4 := s.composeProject.Container(c, "whoami4")
backend1 := map[string]string{
"traefik/backends/backend1/circuitbreaker/expression": "NetworkErrorRatio() > 0.5",
"traefik/backends/backend1/servers/server1/url": "http://" + whoami1.NetworkSettings.IPAddress + ":80",
"traefik/backends/backend1/servers/server1/weight": "10",
"traefik/backends/backend1/servers/server2/url": "http://" + whoami2.NetworkSettings.IPAddress + ":80",
"traefik/backends/backend1/servers/server2/weight": "1",
}
backend2 := map[string]string{
"traefik/backends/backend2/loadbalancer/method": "drr",
"traefik/backends/backend2/servers/server1/url": "http://" + whoami3.NetworkSettings.IPAddress + ":80",
"traefik/backends/backend2/servers/server1/weight": "1",
"traefik/backends/backend2/servers/server2/url": "http://" + whoami4.NetworkSettings.IPAddress + ":80",
"traefik/backends/backend2/servers/server2/weight": "2",
}
frontend1 := map[string]string{
"traefik/frontends/frontend1/backend": "backend2",
"traefik/frontends/frontend1/entrypoints": "http",
"traefik/frontends/frontend1/routes/test_1/rule": "Host:test.localhost",
}
frontend2 := map[string]string{
"traefik/frontends/frontend2/backend": "backend1",
"traefik/frontends/frontend2/entrypoints": "http",
"traefik/frontends/frontend2/routes/test_2/rule": "Path:/test",
}
for key, value := range backend1 {
err := s.kv.Put(key, []byte(value), nil)
c.Assert(err, checker.IsNil)
}
for key, value := range backend2 {
err := s.kv.Put(key, []byte(value), nil)
c.Assert(err, checker.IsNil)
}
for key, value := range frontend1 {
err := s.kv.Put(key, []byte(value), nil)
c.Assert(err, checker.IsNil)
}
for key, value := range frontend2 {
err := s.kv.Put(key, []byte(value), nil)
c.Assert(err, checker.IsNil)
}
// wait for consul
err = utils.Try(60*time.Second, func() error {
_, err := s.kv.Exists("traefik/frontends/frontend2/routes/test_2/rule")
if err != nil {
return err
}
return nil
})
c.Assert(err, checker.IsNil)
// wait for traefik
err = utils.TryRequest("http://127.0.0.1:8081/api/providers", 60*time.Second, func(res *http.Response) error {
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return err
}
if !strings.Contains(string(body), "Path:/test") {
return errors.New("Incorrect traefik config")
}
return nil
})
c.Assert(err, checker.IsNil)
client := &http.Client{}
req, err := http.NewRequest("GET", "http://127.0.0.1:8000/", nil)
c.Assert(err, checker.IsNil)
req.Host = "test.localhost"
response, err := client.Do(req)
c.Assert(err, checker.IsNil)
c.Assert(response.StatusCode, checker.Equals, 200)
body, err := ioutil.ReadAll(response.Body)
c.Assert(err, checker.IsNil)
if !strings.Contains(string(body), whoami3.NetworkSettings.IPAddress) &&
!strings.Contains(string(body), whoami4.NetworkSettings.IPAddress) {
c.Fail()
}
req, err = http.NewRequest("GET", "http://127.0.0.1:8000/test", nil)
c.Assert(err, checker.IsNil)
response, err = client.Do(req)
c.Assert(err, checker.IsNil)
c.Assert(response.StatusCode, checker.Equals, 200)
body, err = ioutil.ReadAll(response.Body)
c.Assert(err, checker.IsNil)
if !strings.Contains(string(body), whoami1.NetworkSettings.IPAddress) &&
!strings.Contains(string(body), whoami2.NetworkSettings.IPAddress) {
c.Fail()
}
req, err = http.NewRequest("GET", "http://127.0.0.1:8000/test2", nil)
resp, err := client.Do(req)
c.Assert(err, checker.IsNil)
c.Assert(resp.StatusCode, checker.Equals, 404)
req, err = http.NewRequest("GET", "http://127.0.0.1:8000/", nil)
req.Host = "test2.localhost"
resp, err = client.Do(req)
c.Assert(err, checker.IsNil)
c.Assert(resp.StatusCode, checker.Equals, 404)
}

View file

@ -8,17 +8,58 @@ import (
"github.com/go-check/check" "github.com/go-check/check"
checker "github.com/vdemeester/shakers" checker "github.com/vdemeester/shakers"
"errors"
"fmt"
"github.com/containous/traefik/integration/utils"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/etcd"
"io/ioutil"
"os"
"strings"
) )
// Etcd test suites (using libcompose) // Etcd test suites (using libcompose)
type EtcdSuite struct{ BaseSuite } type EtcdSuite struct {
BaseSuite
kv store.Store
}
func (s *EtcdSuite) SetUpSuite(c *check.C) { func (s *EtcdSuite) SetUpSuite(c *check.C) {
s.createComposeProject(c, "etcd") s.createComposeProject(c, "etcd")
s.composeProject.Start(c)
etcd.Register()
url := s.composeProject.Container(c, "etcd").NetworkSettings.IPAddress + ":4001"
kv, err := libkv.NewStore(
store.ETCD,
[]string{url},
&store.Config{
ConnectionTimeout: 10 * time.Second,
},
)
if err != nil {
c.Fatal("Cannot create store etcd")
}
s.kv = kv
// wait for etcd
err = utils.Try(60*time.Second, func() error {
_, err := kv.Exists("test")
if err != nil {
return fmt.Errorf("Etcd connection error to %s: %v", url, err)
}
return nil
})
c.Assert(err, checker.IsNil)
} }
func (s *EtcdSuite) TestSimpleConfiguration(c *check.C) { func (s *EtcdSuite) TestSimpleConfiguration(c *check.C) {
cmd := exec.Command(traefikBinary, "--configFile=fixtures/etcd/simple.toml") etcdHost := s.composeProject.Container(c, "etcd").NetworkSettings.IPAddress
file := s.adaptFile(c, "fixtures/etcd/simple.toml", struct{ EtcdHost string }{etcdHost})
defer os.Remove(file)
cmd := exec.Command(traefikBinary, "--configFile="+file)
err := cmd.Start() err := cmd.Start()
c.Assert(err, checker.IsNil) c.Assert(err, checker.IsNil)
defer cmd.Process.Kill() defer cmd.Process.Kill()
@ -31,3 +72,123 @@ func (s *EtcdSuite) TestSimpleConfiguration(c *check.C) {
c.Assert(err, checker.IsNil) c.Assert(err, checker.IsNil)
c.Assert(resp.StatusCode, checker.Equals, 404) c.Assert(resp.StatusCode, checker.Equals, 404)
} }
func (s *EtcdSuite) TestNominalConfiguration(c *check.C) {
etcdHost := s.composeProject.Container(c, "etcd").NetworkSettings.IPAddress
file := s.adaptFile(c, "fixtures/etcd/simple.toml", struct{ EtcdHost string }{etcdHost})
defer os.Remove(file)
cmd := exec.Command(traefikBinary, "--configFile="+file)
err := cmd.Start()
c.Assert(err, checker.IsNil)
defer cmd.Process.Kill()
whoami1 := s.composeProject.Container(c, "whoami1")
whoami2 := s.composeProject.Container(c, "whoami2")
whoami3 := s.composeProject.Container(c, "whoami3")
whoami4 := s.composeProject.Container(c, "whoami4")
backend1 := map[string]string{
"/traefik/backends/backend1/circuitbreaker/expression": "NetworkErrorRatio() > 0.5",
"/traefik/backends/backend1/servers/server1/url": "http://" + whoami1.NetworkSettings.IPAddress + ":80",
"/traefik/backends/backend1/servers/server1/weight": "10",
"/traefik/backends/backend1/servers/server2/url": "http://" + whoami2.NetworkSettings.IPAddress + ":80",
"/traefik/backends/backend1/servers/server2/weight": "1",
}
backend2 := map[string]string{
"/traefik/backends/backend2/loadbalancer/method": "drr",
"/traefik/backends/backend2/servers/server1/url": "http://" + whoami3.NetworkSettings.IPAddress + ":80",
"/traefik/backends/backend2/servers/server1/weight": "1",
"/traefik/backends/backend2/servers/server2/url": "http://" + whoami4.NetworkSettings.IPAddress + ":80",
"/traefik/backends/backend2/servers/server2/weight": "2",
}
frontend1 := map[string]string{
"/traefik/frontends/frontend1/backend": "backend2",
"/traefik/frontends/frontend1/entrypoints": "http",
"/traefik/frontends/frontend1/routes/test_1/rule": "Host:test.localhost",
}
frontend2 := map[string]string{
"/traefik/frontends/frontend2/backend": "backend1",
"/traefik/frontends/frontend2/entrypoints": "http",
"/traefik/frontends/frontend2/routes/test_2/rule": "Path:/test",
}
for key, value := range backend1 {
err := s.kv.Put(key, []byte(value), nil)
c.Assert(err, checker.IsNil)
}
for key, value := range backend2 {
err := s.kv.Put(key, []byte(value), nil)
c.Assert(err, checker.IsNil)
}
for key, value := range frontend1 {
err := s.kv.Put(key, []byte(value), nil)
c.Assert(err, checker.IsNil)
}
for key, value := range frontend2 {
err := s.kv.Put(key, []byte(value), nil)
c.Assert(err, checker.IsNil)
}
// wait for etcd
err = utils.Try(60*time.Second, func() error {
_, err := s.kv.Exists("/traefik/frontends/frontend2/routes/test_2/rule")
if err != nil {
return err
}
return nil
})
c.Assert(err, checker.IsNil)
// wait for traefik
err = utils.TryRequest("http://127.0.0.1:8081/api/providers", 60*time.Second, func(res *http.Response) error {
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return err
}
if !strings.Contains(string(body), "Path:/test") {
return errors.New("Incorrect traefik config")
}
return nil
})
c.Assert(err, checker.IsNil)
client := &http.Client{}
req, err := http.NewRequest("GET", "http://127.0.0.1:8000/", nil)
c.Assert(err, checker.IsNil)
req.Host = "test.localhost"
response, err := client.Do(req)
c.Assert(err, checker.IsNil)
c.Assert(response.StatusCode, checker.Equals, 200)
body, err := ioutil.ReadAll(response.Body)
c.Assert(err, checker.IsNil)
if !strings.Contains(string(body), whoami3.NetworkSettings.IPAddress) &&
!strings.Contains(string(body), whoami4.NetworkSettings.IPAddress) {
c.Fail()
}
req, err = http.NewRequest("GET", "http://127.0.0.1:8000/test", nil)
c.Assert(err, checker.IsNil)
response, err = client.Do(req)
c.Assert(err, checker.IsNil)
c.Assert(response.StatusCode, checker.Equals, 200)
body, err = ioutil.ReadAll(response.Body)
c.Assert(err, checker.IsNil)
if !strings.Contains(string(body), whoami1.NetworkSettings.IPAddress) &&
!strings.Contains(string(body), whoami2.NetworkSettings.IPAddress) {
c.Fail()
}
req, err = http.NewRequest("GET", "http://127.0.0.1:8000/test2", nil)
req.Host = "test2.localhost"
resp, err := client.Do(req)
c.Assert(err, checker.IsNil)
c.Assert(resp.StatusCode, checker.Equals, 404)
req, err = http.NewRequest("GET", "http://127.0.0.1:8000/", nil)
resp, err = client.Do(req)
c.Assert(err, checker.IsNil)
c.Assert(resp.StatusCode, checker.Equals, 404)
}

View file

@ -1,9 +1,16 @@
defaultEntryPoints = ["http"] defaultEntryPoints = ["http"]
logLevel = "DEBUG"
[entryPoints] [entryPoints]
[entryPoints.http] [entryPoints.http]
address = ":8000" address = ":8000"
logLevel = "DEBUG"
[consul] [consul]
endpoint = "{{.ConsulHost}}:8500"
watch = true
prefix = "traefik"
[web]
address = ":8081"

View file

@ -1,11 +1,11 @@
defaultEntryPoints = ["http"] defaultEntryPoints = ["http"]
logLevel = "DEBUG"
[entryPoints] [entryPoints]
[entryPoints.http] [entryPoints.http]
address = ":8000" address = ":8000"
logLevel = "DEBUG"
[docker] [docker]
# It's dynamagic ! # It's dynamagic !

View file

@ -1,10 +1,16 @@
defaultEntryPoints = ["http"] defaultEntryPoints = ["http"]
logLevel = "DEBUG"
[entryPoints] [entryPoints]
[entryPoints.http] [entryPoints.http]
address = ":8000" address = ":8000"
logLevel = "DEBUG"
[etcd] [etcd]
endpoint = "127.0.0.1:4003,127.0.0.1:4002,127.0.0.1:4001" endpoint = "{{.EtcdHost}}:4001"
prefix = "/traefik"
watch = true
[web]
address = ":8081"

View file

@ -64,7 +64,11 @@ func (s *BaseSuite) adaptFileForHost(c *check.C, path string) string {
// Default docker socket // Default docker socket
dockerHost = "unix:///var/run/docker.sock" dockerHost = "unix:///var/run/docker.sock"
} }
tempObjects := struct{ DockerHost string }{dockerHost}
return s.adaptFile(c, path, tempObjects)
}
func (s *BaseSuite) adaptFile(c *check.C, path string, tempObjects interface{}) string {
// Load file // Load file
tmpl, err := template.ParseFiles(path) tmpl, err := template.ParseFiles(path)
c.Assert(err, checker.IsNil) c.Assert(err, checker.IsNil)
@ -74,7 +78,7 @@ func (s *BaseSuite) adaptFileForHost(c *check.C, path string) string {
c.Assert(err, checker.IsNil) c.Assert(err, checker.IsNil)
defer tmpFile.Close() defer tmpFile.Close()
err = tmpl.ExecuteTemplate(tmpFile, prefix, struct{ DockerHost string }{dockerHost}) err = tmpl.ExecuteTemplate(tmpFile, prefix, tempObjects)
c.Assert(err, checker.IsNil) c.Assert(err, checker.IsNil)
err = tmpFile.Sync() err = tmpFile.Sync()

View file

@ -1,6 +1,6 @@
consul: consul:
image: progrium/consul image: progrium/consul
command: -server -bootstrap -advertise 12.0.0.254 -log-level debug -ui-dir /ui command: -server -bootstrap -log-level debug -ui-dir /ui
ports: ports:
- "8400:8400" - "8400:8400"
- "8500:8500" - "8500:8500"
@ -10,4 +10,16 @@ consul:
- "8301" - "8301"
- "8301/udp" - "8301/udp"
- "8302" - "8302"
- "8302/udp" - "8302/udp"
whoami1:
image: emilevauge/whoami
whoami2:
image: emilevauge/whoami
whoami3:
image: emilevauge/whoami
whoami4:
image: emilevauge/whoami

View file

@ -1,30 +1,14 @@
etcd1: etcd:
image: quay.io/coreos/etcd:v2.2.0 image: containous/docker-etcd
net: "host"
command: > whoami1:
--name etcd1 image: emilevauge/whoami
--listen-peer-urls http://localhost:7001
--listen-client-urls http://localhost:4001 whoami2:
--initial-advertise-peer-urls http://localhost:7001 image: emilevauge/whoami
--advertise-client-urls http://localhost:4001
--initial-cluster etcd1=http://localhost:7001,etcd2=http://localhost:7002,etcd3=http://localhost:7003 whoami3:
etcd2: image: emilevauge/whoami
image: quay.io/coreos/etcd:v2.2.0
net: "host" whoami4:
command: > image: emilevauge/whoami
--name etcd2
--listen-peer-urls http://localhost:7002
--listen-client-urls http://localhost:4002
--initial-advertise-peer-urls http://localhost:7002
--advertise-client-urls http://localhost:4002
--initial-cluster etcd1=http://localhost:7001,etcd2=http://localhost:7002,etcd3=http://localhost:7003
etcd3:
image: quay.io/coreos/etcd:v2.2.0
net: "host"
command: >
--name etcd3
--listen-peer-urls http://localhost:7003
--listen-client-urls http://localhost:4003
--initial-advertise-peer-urls http://localhost:7003
--advertise-client-urls http://localhost:4003
--initial-cluster etcd1=http://localhost:7001,etcd2=http://localhost:7002,etcd3=http://localhost:7003

50
integration/utils/try.go Normal file
View file

@ -0,0 +1,50 @@
package utils
import (
"errors"
"github.com/cenkalti/backoff"
"net/http"
"strconv"
"time"
)
// TryRequest try operation timeout, and retry backoff
func TryRequest(url string, timeout time.Duration, condition Condition) error {
exponentialBackOff := backoff.NewExponentialBackOff()
exponentialBackOff.MaxElapsedTime = timeout
var res *http.Response
err := backoff.Retry(func() error {
var err error
res, err = http.Get(url)
if err != nil {
return err
}
return condition(res)
}, exponentialBackOff)
return err
}
// Try try operation timeout, and retry backoff
func Try(timeout time.Duration, operation func() error) error {
exponentialBackOff := backoff.NewExponentialBackOff()
exponentialBackOff.MaxElapsedTime = timeout
err := backoff.Retry(operation, exponentialBackOff)
return err
}
// Condition is a retry condition function.
// It receives a response, and returns an error
// if the response failed the condition.
type Condition func(*http.Response) error
// ErrorIfStatusCodeIsNot returns a retry condition function.
// The condition returns an error
// if the given response's status code is not the given HTTP status code.
func ErrorIfStatusCodeIsNot(status int) Condition {
return func(res *http.Response) error {
if res.StatusCode != status {
return errors.New("Bad status. Got: " + res.Status + ", expected:" + strconv.Itoa(status))
}
return nil
}
}

View file

@ -38,12 +38,11 @@ type KvTLS struct {
InsecureSkipVerify bool InsecureSkipVerify bool
} }
func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix string, stop chan bool) { func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix string, stop chan bool) error {
operation := func() error { operation := func() error {
events, err := provider.kvclient.WatchTree(provider.Prefix, make(chan struct{}) /* stop chan */) events, err := provider.kvclient.WatchTree(provider.Prefix, make(chan struct{}))
if err != nil { if err != nil {
log.Errorf("Failed to WatchTree %s", err) return fmt.Errorf("Failed to KV WatchTree: %v", err)
return err
} }
for { for {
select { select {
@ -65,12 +64,13 @@ func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix
} }
notify := func(err error, time time.Duration) { notify := func(err error, time time.Duration) {
log.Errorf("KV connection error %+v, retrying in %s", err, time) log.Errorf("KV connection error: %+v, retrying in %s", err, time)
} }
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify) err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
if err != nil { if err != nil {
log.Fatalf("Cannot connect to KV server %+v", err) return fmt.Errorf("Cannot connect to KV server: %v", err)
} }
return nil
} }
func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
@ -112,15 +112,18 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *
storeConfig, storeConfig,
) )
if err != nil { if err != nil {
return err return fmt.Errorf("Failed to Connect to KV store: %v", err)
} }
if _, err := kv.List(""); err != nil { if _, err := kv.Exists("qmslkjdfmqlskdjfmqlksjazçueznbvbwzlkajzebvkwjdcqmlsfj"); err != nil {
return err return fmt.Errorf("Failed to test KV store connection: %v", err)
} }
provider.kvclient = kv provider.kvclient = kv
if provider.Watch { if provider.Watch {
pool.Go(func(stop chan bool) { pool.Go(func(stop chan bool) {
provider.watchKv(configurationChan, provider.Prefix, stop) err := provider.watchKv(configurationChan, provider.Prefix, stop)
if err != nil {
log.Errorf("Cannot watch KV store: %v", err)
}
}) })
} }
configuration := provider.loadConfig() configuration := provider.loadConfig()
@ -131,11 +134,11 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *
return nil return nil
} }
notify := func(err error, time time.Duration) { notify := func(err error, time time.Duration) {
log.Errorf("KV connection error %+v, retrying in %s", err, time) log.Errorf("KV connection error: %+v, retrying in %s", err, time)
} }
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify) err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
if err != nil { if err != nil {
log.Fatalf("Cannot connect to KV server %+v", err) return fmt.Errorf("Cannot connect to KV server: %v", err)
} }
return nil return nil
} }
@ -170,7 +173,7 @@ func (provider *Kv) list(keys ...string) []string {
} }
directoryKeys := make(map[string]string) directoryKeys := make(map[string]string)
for _, key := range keysPairs { for _, key := range keysPairs {
directory := strings.Split(strings.TrimPrefix(key.Key, strings.TrimPrefix(joinedKeys, "/")), "/")[0] directory := strings.Split(strings.TrimPrefix(key.Key, joinedKeys), "/")[0]
directoryKeys[directory] = joinedKeys + directory directoryKeys[directory] = joinedKeys + directory
} }
return fun.Values(directoryKeys).([]string) return fun.Values(directoryKeys).([]string)
@ -178,7 +181,7 @@ func (provider *Kv) list(keys ...string) []string {
func (provider *Kv) get(defaultValue string, keys ...string) string { func (provider *Kv) get(defaultValue string, keys ...string) string {
joinedKeys := strings.Join(keys, "") joinedKeys := strings.Join(keys, "")
keyPair, err := provider.kvclient.Get(joinedKeys) keyPair, err := provider.kvclient.Get(strings.TrimPrefix(joinedKeys, "/"))
if err != nil { if err != nil {
log.Warnf("Error getting key %s %s, setting default %s", joinedKeys, err, defaultValue) log.Warnf("Error getting key %s %s, setting default %s", joinedKeys, err, defaultValue)
return defaultValue return defaultValue