From 3afd6024b5b11b796f968769d6c252f3455ca1b8 Mon Sep 17 00:00:00 2001 From: Michael Date: Mon, 16 Oct 2017 16:58:03 +0200 Subject: [PATCH] Fix consul catalog retry --- integration/consul_catalog_test.go | 76 ++++++++++++++++++++++++++---- provider/consul/consul_catalog.go | 15 ++++-- 2 files changed, 78 insertions(+), 13 deletions(-) diff --git a/integration/consul_catalog_test.go b/integration/consul_catalog_test.go index 1ec14f6a1..fdd132122 100644 --- a/integration/consul_catalog_test.go +++ b/integration/consul_catalog_test.go @@ -24,19 +24,20 @@ func (s *ConsulCatalogSuite) SetUpSuite(c *check.C) { s.composeProject.Start(c) consul := s.composeProject.Container(c, "consul") - s.consulIP = consul.NetworkSettings.IPAddress config := api.DefaultConfig() config.Address = s.consulIP + ":8500" - consulClient, err := api.NewClient(config) - if err != nil { - c.Fatalf("Error creating consul client. %v", err) - } - s.consulClient = consulClient + s.createConsulClient(config, c) // Wait for consul to elect itself leader - err = try.Do(3*time.Second, func() error { - leader, err := consulClient.Status().Leader() + err := s.waitToElectConsulLeader() + c.Assert(err, checker.IsNil) + +} + +func (s *ConsulCatalogSuite) waitToElectConsulLeader() error { + return try.Do(3*time.Second, func() error { + leader, err := s.consulClient.Status().Leader() if err != nil || len(leader) == 0 { return fmt.Errorf("Leader not found. %v", err) @@ -44,7 +45,17 @@ func (s *ConsulCatalogSuite) SetUpSuite(c *check.C) { return nil }) - c.Assert(err, checker.IsNil) +} +func (s *ConsulCatalogSuite) createConsulClient(config *api.Config, c *check.C) *api.Client { + consulClient, err := api.NewClient(config) + if err != nil { + c.Fatalf("Error creating consul client. %v", err) + } + s.consulClient = consulClient + return consulClient +} +func (s *ConsulCatalogSuite) startConsulService(c *check.C) { + } func (s *ConsulCatalogSuite) registerService(name string, address string, port int, tags []string) error { @@ -332,3 +343,50 @@ func (s *ConsulCatalogSuite) TestBasicAuthSimpleService(c *check.C) { err = try.Request(req, 5*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody()) c.Assert(err, checker.IsNil) } + +func (s *ConsulCatalogSuite) TestRetryWithConsulServer(c *check.C) { + + //Scale consul to 0 to be able to start traefik before and test retry + s.composeProject.Scale(c, "consul", 0) + + cmd, display := s.traefikCmd( + withConfigFile("fixtures/consul_catalog/simple.toml"), + "--consulCatalog", + "--consulCatalog.watch=false", + "--consulCatalog.exposedByDefault=true", + "--consulCatalog.endpoint="+s.consulIP+":8500", + "--consulCatalog.domain=consul.localhost") + defer display(c) + err := cmd.Start() + c.Assert(err, checker.IsNil) + defer cmd.Process.Kill() + + // Wait for Traefik to turn ready. + err = try.GetRequest("http://127.0.0.1:8000/", 2*time.Second, try.StatusCodeIs(http.StatusNotFound)) + c.Assert(err, checker.IsNil) + + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000/", nil) + c.Assert(err, checker.IsNil) + req.Host = "test.consul.localhost" + + // Request should fail + err = try.Request(req, 2*time.Second, try.StatusCodeIs(http.StatusNotFound), try.HasBody()) + c.Assert(err, checker.IsNil) + + // Scale consul to 1 + s.composeProject.Scale(c, "consul", 1) + s.waitToElectConsulLeader() + + nginx := s.composeProject.Container(c, "nginx1") + // Register service + err = s.registerService("test", nginx.NetworkSettings.IPAddress, 80, []string{}) + c.Assert(err, checker.IsNil, check.Commentf("Error registering service")) + + // Provider consul catalog should be present + err = try.GetRequest("http://127.0.0.1:8080/api/providers", 10*time.Second, try.BodyContains("consul_catalog")) + c.Assert(err, checker.IsNil) + + // Should be ok + err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody()) + c.Assert(err, checker.IsNil) +} diff --git a/provider/consul/consul_catalog.go b/provider/consul/consul_catalog.go index 02230a621..868bcd6b4 100644 --- a/provider/consul/consul_catalog.go +++ b/provider/consul/consul_catalog.go @@ -110,7 +110,7 @@ func getChangedHealthyKeys(currState []string, prevState []string) ([]string, [] return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string) } -func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[string][]string) { +func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[string][]string, errorCh chan<- error) { health := p.client.Health() catalog := p.client.Catalog() @@ -131,6 +131,7 @@ func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan< healthyState, meta, err := health.State("passing", options) if err != nil { log.WithError(err).Error("Failed to retrieve health checks") + errorCh <- err return } @@ -154,6 +155,7 @@ func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan< data, _, err := catalog.Services(&api.QueryOptions{}) if err != nil { log.Errorf("Failed to list services: %s", err) + errorCh <- err return } @@ -186,7 +188,7 @@ type Service struct { Nodes []string } -func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- map[string][]string) { +func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- map[string][]string, errorCh chan<- error) { catalog := p.client.Catalog() safe.Go(func() { @@ -205,6 +207,7 @@ func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh c data, meta, err := catalog.Services(options) if err != nil { log.Errorf("Failed to list services: %s", err) + errorCh <- err return } @@ -220,6 +223,7 @@ func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh c nodes, _, err := catalog.Service(key, "", &api.QueryOptions{}) if err != nil { log.Errorf("Failed to get detail of service %s: %s", key, err) + errorCh <- err return } nodesID := getServiceIds(nodes) @@ -531,9 +535,10 @@ func (p *CatalogProvider) getNodes(index map[string][]string) ([]catalogUpdate, func (p *CatalogProvider) watch(configurationChan chan<- types.ConfigMessage, stop chan bool) error { stopCh := make(chan struct{}) watchCh := make(chan map[string][]string) + errorCh := make(chan error) - p.watchHealthState(stopCh, watchCh) - p.watchCatalogServices(stopCh, watchCh) + p.watchHealthState(stopCh, watchCh, errorCh) + p.watchCatalogServices(stopCh, watchCh, errorCh) defer close(stopCh) defer close(watchCh) @@ -556,6 +561,8 @@ func (p *CatalogProvider) watch(configurationChan chan<- types.ConfigMessage, st ProviderName: "consul_catalog", Configuration: configuration, } + case err := <-errorCh: + return err } } }