Fix consul catalog retry

This commit is contained in:
Michael 2017-10-16 16:58:03 +02:00 committed by Traefiker
parent aa308b7a3a
commit 3afd6024b5
2 changed files with 78 additions and 13 deletions

View file

@ -24,19 +24,20 @@ func (s *ConsulCatalogSuite) SetUpSuite(c *check.C) {
s.composeProject.Start(c) s.composeProject.Start(c)
consul := s.composeProject.Container(c, "consul") consul := s.composeProject.Container(c, "consul")
s.consulIP = consul.NetworkSettings.IPAddress s.consulIP = consul.NetworkSettings.IPAddress
config := api.DefaultConfig() config := api.DefaultConfig()
config.Address = s.consulIP + ":8500" config.Address = s.consulIP + ":8500"
consulClient, err := api.NewClient(config) s.createConsulClient(config, c)
if err != nil {
c.Fatalf("Error creating consul client. %v", err)
}
s.consulClient = consulClient
// Wait for consul to elect itself leader // Wait for consul to elect itself leader
err = try.Do(3*time.Second, func() error { err := s.waitToElectConsulLeader()
leader, err := consulClient.Status().Leader() c.Assert(err, checker.IsNil)
}
func (s *ConsulCatalogSuite) waitToElectConsulLeader() error {
return try.Do(3*time.Second, func() error {
leader, err := s.consulClient.Status().Leader()
if err != nil || len(leader) == 0 { if err != nil || len(leader) == 0 {
return fmt.Errorf("Leader not found. %v", err) return fmt.Errorf("Leader not found. %v", err)
@ -44,7 +45,17 @@ func (s *ConsulCatalogSuite) SetUpSuite(c *check.C) {
return nil return nil
}) })
c.Assert(err, checker.IsNil) }
func (s *ConsulCatalogSuite) createConsulClient(config *api.Config, c *check.C) *api.Client {
consulClient, err := api.NewClient(config)
if err != nil {
c.Fatalf("Error creating consul client. %v", err)
}
s.consulClient = consulClient
return consulClient
}
func (s *ConsulCatalogSuite) startConsulService(c *check.C) {
} }
func (s *ConsulCatalogSuite) registerService(name string, address string, port int, tags []string) error { func (s *ConsulCatalogSuite) registerService(name string, address string, port int, tags []string) error {
@ -332,3 +343,50 @@ func (s *ConsulCatalogSuite) TestBasicAuthSimpleService(c *check.C) {
err = try.Request(req, 5*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody()) err = try.Request(req, 5*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody())
c.Assert(err, checker.IsNil) c.Assert(err, checker.IsNil)
} }
func (s *ConsulCatalogSuite) TestRetryWithConsulServer(c *check.C) {
//Scale consul to 0 to be able to start traefik before and test retry
s.composeProject.Scale(c, "consul", 0)
cmd, display := s.traefikCmd(
withConfigFile("fixtures/consul_catalog/simple.toml"),
"--consulCatalog",
"--consulCatalog.watch=false",
"--consulCatalog.exposedByDefault=true",
"--consulCatalog.endpoint="+s.consulIP+":8500",
"--consulCatalog.domain=consul.localhost")
defer display(c)
err := cmd.Start()
c.Assert(err, checker.IsNil)
defer cmd.Process.Kill()
// Wait for Traefik to turn ready.
err = try.GetRequest("http://127.0.0.1:8000/", 2*time.Second, try.StatusCodeIs(http.StatusNotFound))
c.Assert(err, checker.IsNil)
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000/", nil)
c.Assert(err, checker.IsNil)
req.Host = "test.consul.localhost"
// Request should fail
err = try.Request(req, 2*time.Second, try.StatusCodeIs(http.StatusNotFound), try.HasBody())
c.Assert(err, checker.IsNil)
// Scale consul to 1
s.composeProject.Scale(c, "consul", 1)
s.waitToElectConsulLeader()
nginx := s.composeProject.Container(c, "nginx1")
// Register service
err = s.registerService("test", nginx.NetworkSettings.IPAddress, 80, []string{})
c.Assert(err, checker.IsNil, check.Commentf("Error registering service"))
// Provider consul catalog should be present
err = try.GetRequest("http://127.0.0.1:8080/api/providers", 10*time.Second, try.BodyContains("consul_catalog"))
c.Assert(err, checker.IsNil)
// Should be ok
err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody())
c.Assert(err, checker.IsNil)
}

View file

@ -110,7 +110,7 @@ func getChangedHealthyKeys(currState []string, prevState []string) ([]string, []
return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string) return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string)
} }
func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[string][]string) { func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[string][]string, errorCh chan<- error) {
health := p.client.Health() health := p.client.Health()
catalog := p.client.Catalog() catalog := p.client.Catalog()
@ -131,6 +131,7 @@ func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan<
healthyState, meta, err := health.State("passing", options) healthyState, meta, err := health.State("passing", options)
if err != nil { if err != nil {
log.WithError(err).Error("Failed to retrieve health checks") log.WithError(err).Error("Failed to retrieve health checks")
errorCh <- err
return return
} }
@ -154,6 +155,7 @@ func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan<
data, _, err := catalog.Services(&api.QueryOptions{}) data, _, err := catalog.Services(&api.QueryOptions{})
if err != nil { if err != nil {
log.Errorf("Failed to list services: %s", err) log.Errorf("Failed to list services: %s", err)
errorCh <- err
return return
} }
@ -186,7 +188,7 @@ type Service struct {
Nodes []string Nodes []string
} }
func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- map[string][]string) { func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- map[string][]string, errorCh chan<- error) {
catalog := p.client.Catalog() catalog := p.client.Catalog()
safe.Go(func() { safe.Go(func() {
@ -205,6 +207,7 @@ func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh c
data, meta, err := catalog.Services(options) data, meta, err := catalog.Services(options)
if err != nil { if err != nil {
log.Errorf("Failed to list services: %s", err) log.Errorf("Failed to list services: %s", err)
errorCh <- err
return return
} }
@ -220,6 +223,7 @@ func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh c
nodes, _, err := catalog.Service(key, "", &api.QueryOptions{}) nodes, _, err := catalog.Service(key, "", &api.QueryOptions{})
if err != nil { if err != nil {
log.Errorf("Failed to get detail of service %s: %s", key, err) log.Errorf("Failed to get detail of service %s: %s", key, err)
errorCh <- err
return return
} }
nodesID := getServiceIds(nodes) nodesID := getServiceIds(nodes)
@ -531,9 +535,10 @@ func (p *CatalogProvider) getNodes(index map[string][]string) ([]catalogUpdate,
func (p *CatalogProvider) watch(configurationChan chan<- types.ConfigMessage, stop chan bool) error { func (p *CatalogProvider) watch(configurationChan chan<- types.ConfigMessage, stop chan bool) error {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
watchCh := make(chan map[string][]string) watchCh := make(chan map[string][]string)
errorCh := make(chan error)
p.watchHealthState(stopCh, watchCh) p.watchHealthState(stopCh, watchCh, errorCh)
p.watchCatalogServices(stopCh, watchCh) p.watchCatalogServices(stopCh, watchCh, errorCh)
defer close(stopCh) defer close(stopCh)
defer close(watchCh) defer close(watchCh)
@ -556,6 +561,8 @@ func (p *CatalogProvider) watch(configurationChan chan<- types.ConfigMessage, st
ProviderName: "consul_catalog", ProviderName: "consul_catalog",
Configuration: configuration, Configuration: configuration,
} }
case err := <-errorCh:
return err
} }
} }
} }