From f80a6ef2a665729da7dd5b408736235e09f1acd5 Mon Sep 17 00:00:00 2001 From: SALLEYRON Julien Date: Fri, 8 Sep 2017 20:50:04 +0200 Subject: [PATCH] Fix consul catalog refresh problems --- integration/consul_catalog_test.go | 95 +++++++++- .../fixtures/consul_catalog/simple.toml | 3 + .../resources/compose/consul_catalog.yml | 2 +- provider/consul/consul_catalog.go | 98 ++++++++-- provider/consul/consul_catalog_test.go | 172 +++++++++--------- 5 files changed, 259 insertions(+), 111 deletions(-) diff --git a/integration/consul_catalog_test.go b/integration/consul_catalog_test.go index 9fad9e974..e0aa011c6 100644 --- a/integration/consul_catalog_test.go +++ b/integration/consul_catalog_test.go @@ -66,6 +66,30 @@ func (s *ConsulCatalogSuite) registerService(name string, address string, port i return err } +func (s *ConsulCatalogSuite) registerAgentService(name string, address string, port int, tags []string) error { + agent := s.consulClient.Agent() + err := agent.ServiceRegister( + &api.AgentServiceRegistration{ + ID: address, + Tags: tags, + Name: name, + Address: address, + Port: port, + Check: &api.AgentServiceCheck{ + HTTP: "http://" + address, + Interval: "10s", + }, + }, + ) + return err +} + +func (s *ConsulCatalogSuite) deregisterAgentService(address string) error { + agent := s.consulClient.Agent() + err := agent.ServiceDeregister(address) + return err +} + func (s *ConsulCatalogSuite) deregisterService(name string, address string) error { catalog := s.consulClient.Catalog() _, err := catalog.Deregister( @@ -104,7 +128,7 @@ func (s *ConsulCatalogSuite) TestSingleService(c *check.C) { c.Assert(err, checker.IsNil) defer cmd.Process.Kill() - nginx := s.composeProject.Container(c, "nginx") + nginx := s.composeProject.Container(c, "nginx1") err = s.registerService("test", nginx.NetworkSettings.IPAddress, 80, []string{}) c.Assert(err, checker.IsNil, check.Commentf("Error registering service")) @@ -114,7 +138,7 @@ func (s *ConsulCatalogSuite) TestSingleService(c *check.C) { c.Assert(err, checker.IsNil) req.Host = "test.consul.localhost" - err = try.Request(req, 5*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody()) + err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody()) c.Assert(err, checker.IsNil) } @@ -129,7 +153,7 @@ func (s *ConsulCatalogSuite) TestExposedByDefaultFalseSingleService(c *check.C) c.Assert(err, checker.IsNil) defer cmd.Process.Kill() - nginx := s.composeProject.Container(c, "nginx") + nginx := s.composeProject.Container(c, "nginx1") err = s.registerService("test", nginx.NetworkSettings.IPAddress, 80, []string{}) c.Assert(err, checker.IsNil, check.Commentf("Error registering service")) @@ -154,7 +178,7 @@ func (s *ConsulCatalogSuite) TestExposedByDefaultFalseSimpleServiceMultipleNode( c.Assert(err, checker.IsNil) defer cmd.Process.Kill() - nginx := s.composeProject.Container(c, "nginx") + nginx := s.composeProject.Container(c, "nginx1") nginx2 := s.composeProject.Container(c, "nginx2") err = s.registerService("test", nginx.NetworkSettings.IPAddress, 80, []string{}) @@ -184,14 +208,14 @@ func (s *ConsulCatalogSuite) TestExposedByDefaultTrueSimpleServiceMultipleNode(c c.Assert(err, checker.IsNil) defer cmd.Process.Kill() - nginx := s.composeProject.Container(c, "nginx") + nginx := s.composeProject.Container(c, "nginx1") nginx2 := s.composeProject.Container(c, "nginx2") - err = s.registerService("test", nginx.NetworkSettings.IPAddress, 80, []string{}) + err = s.registerService("test", nginx.NetworkSettings.IPAddress, 80, []string{"name=nginx1"}) c.Assert(err, checker.IsNil, check.Commentf("Error registering service")) defer s.deregisterService("test", nginx.NetworkSettings.IPAddress) - err = s.registerService("test", nginx2.NetworkSettings.IPAddress, 80, []string{}) + err = s.registerService("test", nginx2.NetworkSettings.IPAddress, 80, []string{"name=nginx2"}) c.Assert(err, checker.IsNil, check.Commentf("Error registering service")) defer s.deregisterService("test", nginx2.NetworkSettings.IPAddress) @@ -201,6 +225,61 @@ func (s *ConsulCatalogSuite) TestExposedByDefaultTrueSimpleServiceMultipleNode(c err = try.Request(req, 5*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody()) c.Assert(err, checker.IsNil) + + err = try.GetRequest("http://127.0.0.1:8080/api/providers/consul_catalog/backends", 60*time.Second, try.BodyContains("nginx1", "nginx2")) + c.Assert(err, checker.IsNil) + +} + +func (s *ConsulCatalogSuite) TestRefreshConfigWithMultipleNodeWithoutHealthCheck(c *check.C) { + cmd, _ := s.cmdTraefik( + withConfigFile("fixtures/consul_catalog/simple.toml"), + "--consulCatalog", + "--consulCatalog.exposedByDefault=true", + "--consulCatalog.endpoint="+s.consulIP+":8500", + "--consulCatalog.domain=consul.localhost") + err := cmd.Start() + c.Assert(err, checker.IsNil) + defer cmd.Process.Kill() + + nginx := s.composeProject.Container(c, "nginx1") + nginx2 := s.composeProject.Container(c, "nginx2") + + err = s.registerService("test", nginx.NetworkSettings.IPAddress, 80, []string{"name=nginx1"}) + c.Assert(err, checker.IsNil, check.Commentf("Error registering service")) + defer s.deregisterService("test", nginx.NetworkSettings.IPAddress) + + err = s.registerAgentService("test", nginx.NetworkSettings.IPAddress, 80, []string{"name=nginx1"}) + c.Assert(err, checker.IsNil, check.Commentf("Error registering agent service")) + + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000/", nil) + c.Assert(err, checker.IsNil) + req.Host = "test.consul.localhost" + + err = try.Request(req, 5*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody()) + c.Assert(err, checker.IsNil) + + err = try.GetRequest("http://127.0.0.1:8080/api/providers/consul_catalog/backends", 60*time.Second, try.BodyContains("nginx1")) + c.Assert(err, checker.IsNil) + + err = s.registerService("test", nginx2.NetworkSettings.IPAddress, 80, []string{"name=nginx2"}) + c.Assert(err, checker.IsNil, check.Commentf("Error registering service")) + + err = try.GetRequest("http://127.0.0.1:8080/api/providers/consul_catalog/backends", 60*time.Second, try.BodyContains("nginx1", "nginx2")) + c.Assert(err, checker.IsNil) + + s.deregisterService("test", nginx2.NetworkSettings.IPAddress) + + err = try.GetRequest("http://127.0.0.1:8080/api/providers/consul_catalog/backends", 60*time.Second, try.BodyContains("nginx1")) + c.Assert(err, checker.IsNil) + + err = s.registerService("test", nginx2.NetworkSettings.IPAddress, 80, []string{"name=nginx2"}) + c.Assert(err, checker.IsNil, check.Commentf("Error registering service")) + defer s.deregisterService("test", nginx2.NetworkSettings.IPAddress) + + err = try.GetRequest("http://127.0.0.1:8080/api/providers/consul_catalog/backends", 60*time.Second, try.BodyContains("nginx1", "nginx2")) + c.Assert(err, checker.IsNil) + } func (s *ConsulCatalogSuite) TestBasicAuthSimpleService(c *check.C) { @@ -218,7 +297,7 @@ func (s *ConsulCatalogSuite) TestBasicAuthSimpleService(c *check.C) { s.displayTraefikLog(c, output) }() - nginx := s.composeProject.Container(c, "nginx") + nginx := s.composeProject.Container(c, "nginx1") err = s.registerService("test", nginx.NetworkSettings.IPAddress, 80, []string{ "traefik.frontend.auth.basic=test:$2a$06$O5NksJPAcgrC9MuANkSoE.Xe9DSg7KcLLFYNr1Lj6hPcMmvgwxhme,test2:$2y$10$xP1SZ70QbZ4K2bTGKJOhpujkpcLxQcB3kEPF6XAV19IdcqsZTyDEe", diff --git a/integration/fixtures/consul_catalog/simple.toml b/integration/fixtures/consul_catalog/simple.toml index 55fb8e989..45bca454f 100644 --- a/integration/fixtures/consul_catalog/simple.toml +++ b/integration/fixtures/consul_catalog/simple.toml @@ -1,6 +1,9 @@ defaultEntryPoints = ["http"] logLevel = "DEBUG" +[web] + address = ":8080" + [entryPoints] [entryPoints.http] address = ":8000" diff --git a/integration/resources/compose/consul_catalog.yml b/integration/resources/compose/consul_catalog.yml index 54f874f2d..b755d2ad5 100644 --- a/integration/resources/compose/consul_catalog.yml +++ b/integration/resources/compose/consul_catalog.yml @@ -11,7 +11,7 @@ consul: - "8301/udp" - "8302" - "8302/udp" -nginx: +nginx1: image: nginx:alpine nginx2: image: nginx:alpine diff --git a/provider/consul/consul_catalog.go b/provider/consul/consul_catalog.go index 86d87eeda..a6fe6f447 100644 --- a/provider/consul/consul_catalog.go +++ b/provider/consul/consul_catalog.go @@ -77,7 +77,7 @@ func (a nodeSorter) Less(i int, j int) bool { return lentr.Service.Port < rentr.Service.Port } -func getChangedKeys(currState map[string][]string, prevState map[string][]string) ([]string, []string) { +func getChangedServiceKeys(currState map[string]Service, prevState map[string]Service) ([]string, []string) { currKeySet := fun.Set(fun.Keys(currState).([]string)).(map[string]bool) prevKeySet := fun.Set(fun.Keys(prevState).([]string)).(map[string]bool) @@ -87,13 +87,36 @@ func getChangedKeys(currState map[string][]string, prevState map[string][]string return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string) } +func getChangedServiceNodeKeys(currState map[string]Service, prevState map[string]Service) ([]string, []string) { + var addedNodeKeys []string + var removedNodeKeys []string + for key, value := range currState { + if prevValue, ok := prevState[key]; ok { + addedKeys, removedKeys := getChangedHealthyKeys(value.Nodes, prevValue.Nodes) + addedNodeKeys = append(addedKeys) + removedNodeKeys = append(removedKeys) + } + } + return addedNodeKeys, removedNodeKeys +} + +func getChangedHealthyKeys(currState []string, prevState []string) ([]string, []string) { + currKeySet := fun.Set(currState).(map[string]bool) + prevKeySet := fun.Set(prevState).(map[string]bool) + + addedKeys := fun.Difference(currKeySet, prevKeySet).(map[string]bool) + removedKeys := fun.Difference(prevKeySet, currKeySet).(map[string]bool) + + return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string) +} + func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[string][]string) { health := p.client.Health() catalog := p.client.Catalog() safe.Go(func() { // variable to hold previous state - var flashback map[string][]string + var flashback []string options := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} @@ -105,14 +128,20 @@ func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan< } // Listening to changes that leads to `passing` state or degrades from it. - // The call is used just as a trigger for further actions - // (intentionally there is no interest in the received data). - _, meta, err := health.State("passing", options) + healthyState, meta, err := health.State("passing", options) if err != nil { log.WithError(err).Error("Failed to retrieve health checks") return } + var current []string + if healthyState != nil { + for _, healthy := range healthyState { + current = append(current, healthy.ServiceID) + } + + } + // If LastIndex didn't change then it means `Get` returned // because of the WaitTime and the key didn't changed. if options.WaitIndex == meta.LastIndex { @@ -132,30 +161,38 @@ func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan< // A critical note is that the return of a blocking request is no guarantee of a change. // It is possible that there was an idempotent write that does not affect the result of the query. // Thus it is required to do extra check for changes... - addedKeys, removedKeys := getChangedKeys(data, flashback) + addedKeys, removedKeys := getChangedHealthyKeys(current, flashback) if len(addedKeys) > 0 { log.WithField("DiscoveredServices", addedKeys).Debug("Health State change detected.") watchCh <- data - flashback = data + flashback = current } if len(removedKeys) > 0 { log.WithField("MissingServices", removedKeys).Debug("Health State change detected.") watchCh <- data - flashback = data + flashback = current } } } }) } +// Service represent a Consul service. +type Service struct { + Name string + Tags []string + Nodes []string +} + func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- map[string][]string) { catalog := p.client.Catalog() safe.Go(func() { + current := make(map[string]Service) // variable to hold previous state - var flashback map[string][]string + var flashback map[string]Service options := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} @@ -179,26 +216,55 @@ func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh c options.WaitIndex = meta.LastIndex if data != nil { + + for key, value := range data { + nodes, _, err := catalog.Service(key, "", options) + if err != nil { + log.Errorf("Failed to get detail of service %s: %s", key, err) + return + } + nodesID := getServiceIds(nodes) + if service, ok := current[key]; ok { + service.Tags = value + service.Nodes = nodesID + } else { + service := Service{ + Name: key, + Tags: value, + Nodes: nodesID, + } + current[key] = service + } + } // A critical note is that the return of a blocking request is no guarantee of a change. // It is possible that there was an idempotent write that does not affect the result of the query. // Thus it is required to do extra check for changes... - addedKeys, removedKeys := getChangedKeys(data, flashback) + addedServiceKeys, removedServiceKeys := getChangedServiceKeys(current, flashback) - if len(addedKeys) > 0 { - log.WithField("DiscoveredServices", addedKeys).Debug("Catalog Services change detected.") + addedServiceNodeKeys, removedServiceNodeKeys := getChangedServiceNodeKeys(current, flashback) + + if len(addedServiceKeys) > 0 || len(addedServiceNodeKeys) > 0 { + log.WithField("DiscoveredServices", addedServiceKeys).Debug("Catalog Services change detected.") watchCh <- data - flashback = data + flashback = current } - if len(removedKeys) > 0 { - log.WithField("MissingServices", removedKeys).Debug("Catalog Services change detected.") + if len(removedServiceKeys) > 0 || len(removedServiceNodeKeys) > 0 { + log.WithField("MissingServices", removedServiceKeys).Debug("Catalog Services change detected.") watchCh <- data - flashback = data + flashback = current } } } }) } +func getServiceIds(services []*api.CatalogService) []string { + var serviceIds []string + for _, service := range services { + serviceIds = append(serviceIds, service.ServiceID) + } + return serviceIds +} func (p *CatalogProvider) healthyNodes(service string) (catalogUpdate, error) { health := p.client.Health() diff --git a/provider/consul/consul_catalog_test.go b/provider/consul/consul_catalog_test.go index 23f3f1422..7cd3c25eb 100644 --- a/provider/consul/consul_catalog_test.go +++ b/provider/consul/consul_catalog_test.go @@ -613,8 +613,8 @@ func TestConsulCatalogNodeSorter(t *testing.T) { func TestConsulCatalogGetChangedKeys(t *testing.T) { type Input struct { - currState map[string][]string - prevState map[string][]string + currState map[string]Service + prevState map[string]Service } type Output struct { @@ -628,37 +628,37 @@ func TestConsulCatalogGetChangedKeys(t *testing.T) { }{ { input: Input{ - currState: map[string][]string{ - "foo-service": {"v1"}, - "bar-service": {"v1"}, - "baz-service": {"v1"}, - "qux-service": {"v1"}, - "quux-service": {"v1"}, - "quuz-service": {"v1"}, - "corge-service": {"v1"}, - "grault-service": {"v1"}, - "garply-service": {"v1"}, - "waldo-service": {"v1"}, - "fred-service": {"v1"}, - "plugh-service": {"v1"}, - "xyzzy-service": {"v1"}, - "thud-service": {"v1"}, + currState: map[string]Service{ + "foo-service": {Name: "v1"}, + "bar-service": {Name: "v1"}, + "baz-service": {Name: "v1"}, + "qux-service": {Name: "v1"}, + "quux-service": {Name: "v1"}, + "quuz-service": {Name: "v1"}, + "corge-service": {Name: "v1"}, + "grault-service": {Name: "v1"}, + "garply-service": {Name: "v1"}, + "waldo-service": {Name: "v1"}, + "fred-service": {Name: "v1"}, + "plugh-service": {Name: "v1"}, + "xyzzy-service": {Name: "v1"}, + "thud-service": {Name: "v1"}, }, - prevState: map[string][]string{ - "foo-service": {"v1"}, - "bar-service": {"v1"}, - "baz-service": {"v1"}, - "qux-service": {"v1"}, - "quux-service": {"v1"}, - "quuz-service": {"v1"}, - "corge-service": {"v1"}, - "grault-service": {"v1"}, - "garply-service": {"v1"}, - "waldo-service": {"v1"}, - "fred-service": {"v1"}, - "plugh-service": {"v1"}, - "xyzzy-service": {"v1"}, - "thud-service": {"v1"}, + prevState: map[string]Service{ + "foo-service": {Name: "v1"}, + "bar-service": {Name: "v1"}, + "baz-service": {Name: "v1"}, + "qux-service": {Name: "v1"}, + "quux-service": {Name: "v1"}, + "quuz-service": {Name: "v1"}, + "corge-service": {Name: "v1"}, + "grault-service": {Name: "v1"}, + "garply-service": {Name: "v1"}, + "waldo-service": {Name: "v1"}, + "fred-service": {Name: "v1"}, + "plugh-service": {Name: "v1"}, + "xyzzy-service": {Name: "v1"}, + "thud-service": {Name: "v1"}, }, }, output: Output{ @@ -668,34 +668,34 @@ func TestConsulCatalogGetChangedKeys(t *testing.T) { }, { input: Input{ - currState: map[string][]string{ - "foo-service": {"v1"}, - "bar-service": {"v1"}, - "baz-service": {"v1"}, - "qux-service": {"v1"}, - "quux-service": {"v1"}, - "quuz-service": {"v1"}, - "corge-service": {"v1"}, - "grault-service": {"v1"}, - "garply-service": {"v1"}, - "waldo-service": {"v1"}, - "fred-service": {"v1"}, - "plugh-service": {"v1"}, - "xyzzy-service": {"v1"}, - "thud-service": {"v1"}, + currState: map[string]Service{ + "foo-service": {Name: "v1"}, + "bar-service": {Name: "v1"}, + "baz-service": {Name: "v1"}, + "qux-service": {Name: "v1"}, + "quux-service": {Name: "v1"}, + "quuz-service": {Name: "v1"}, + "corge-service": {Name: "v1"}, + "grault-service": {Name: "v1"}, + "garply-service": {Name: "v1"}, + "waldo-service": {Name: "v1"}, + "fred-service": {Name: "v1"}, + "plugh-service": {Name: "v1"}, + "xyzzy-service": {Name: "v1"}, + "thud-service": {Name: "v1"}, }, - prevState: map[string][]string{ - "foo-service": {"v1"}, - "bar-service": {"v1"}, - "baz-service": {"v1"}, - "corge-service": {"v1"}, - "grault-service": {"v1"}, - "garply-service": {"v1"}, - "waldo-service": {"v1"}, - "fred-service": {"v1"}, - "plugh-service": {"v1"}, - "xyzzy-service": {"v1"}, - "thud-service": {"v1"}, + prevState: map[string]Service{ + "foo-service": {Name: "v1"}, + "bar-service": {Name: "v1"}, + "baz-service": {Name: "v1"}, + "corge-service": {Name: "v1"}, + "grault-service": {Name: "v1"}, + "garply-service": {Name: "v1"}, + "waldo-service": {Name: "v1"}, + "fred-service": {Name: "v1"}, + "plugh-service": {Name: "v1"}, + "xyzzy-service": {Name: "v1"}, + "thud-service": {Name: "v1"}, }, }, output: Output{ @@ -705,33 +705,33 @@ func TestConsulCatalogGetChangedKeys(t *testing.T) { }, { input: Input{ - currState: map[string][]string{ - "foo-service": {"v1"}, - "qux-service": {"v1"}, - "quux-service": {"v1"}, - "quuz-service": {"v1"}, - "corge-service": {"v1"}, - "grault-service": {"v1"}, - "garply-service": {"v1"}, - "waldo-service": {"v1"}, - "fred-service": {"v1"}, - "plugh-service": {"v1"}, - "xyzzy-service": {"v1"}, - "thud-service": {"v1"}, + currState: map[string]Service{ + "foo-service": {Name: "v1"}, + "qux-service": {Name: "v1"}, + "quux-service": {Name: "v1"}, + "quuz-service": {Name: "v1"}, + "corge-service": {Name: "v1"}, + "grault-service": {Name: "v1"}, + "garply-service": {Name: "v1"}, + "waldo-service": {Name: "v1"}, + "fred-service": {Name: "v1"}, + "plugh-service": {Name: "v1"}, + "xyzzy-service": {Name: "v1"}, + "thud-service": {Name: "v1"}, }, - prevState: map[string][]string{ - "foo-service": {"v1"}, - "bar-service": {"v1"}, - "baz-service": {"v1"}, - "qux-service": {"v1"}, - "quux-service": {"v1"}, - "quuz-service": {"v1"}, - "corge-service": {"v1"}, - "waldo-service": {"v1"}, - "fred-service": {"v1"}, - "plugh-service": {"v1"}, - "xyzzy-service": {"v1"}, - "thud-service": {"v1"}, + prevState: map[string]Service{ + "foo-service": {Name: "v1"}, + "bar-service": {Name: "v1"}, + "baz-service": {Name: "v1"}, + "qux-service": {Name: "v1"}, + "quux-service": {Name: "v1"}, + "quuz-service": {Name: "v1"}, + "corge-service": {Name: "v1"}, + "waldo-service": {Name: "v1"}, + "fred-service": {Name: "v1"}, + "plugh-service": {Name: "v1"}, + "xyzzy-service": {Name: "v1"}, + "thud-service": {Name: "v1"}, }, }, output: Output{ @@ -742,7 +742,7 @@ func TestConsulCatalogGetChangedKeys(t *testing.T) { } for _, c := range cases { - addedKeys, removedKeys := getChangedKeys(c.input.currState, c.input.prevState) + addedKeys, removedKeys := getChangedServiceKeys(c.input.currState, c.input.prevState) if !reflect.DeepEqual(fun.Set(addedKeys), fun.Set(c.output.addedKeys)) { t.Fatalf("Added keys comparison results: got %q, want %q", addedKeys, c.output.addedKeys)