diff --git a/integration/consul_catalog_test.go b/integration/consul_catalog_test.go index 0241d8ab3..40765059c 100644 --- a/integration/consul_catalog_test.go +++ b/integration/consul_catalog_test.go @@ -419,6 +419,46 @@ func (s *ConsulCatalogSuite) TestCircuitBreaker(c *check.C) { c.Assert(err, checker.IsNil) } +func (s *ConsulCatalogSuite) TestRefreshConfigPortChange(c *check.C) { + cmd, display := s.traefikCmd( + withConfigFile("fixtures/consul_catalog/simple.toml"), + "--consulCatalog", + "--consulCatalog.exposedByDefault=false", + "--consulCatalog.watch=true", + "--consulCatalog.endpoint="+s.consulIP+":8500", + "--consulCatalog.domain=consul.localhost") + defer display(c) + err := cmd.Start() + c.Assert(err, checker.IsNil) + defer cmd.Process.Kill() + + nginx := s.composeProject.Container(c, "nginx1") + + err = s.registerService("test", nginx.NetworkSettings.IPAddress, 81, []string{"name=nginx1", "traefik.enable=true"}) + c.Assert(err, checker.IsNil, check.Commentf("Error registering service")) + + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000/", nil) + c.Assert(err, checker.IsNil) + req.Host = "test.consul.localhost" + + err = try.Request(req, 20*time.Second, try.StatusCodeIs(http.StatusBadGateway)) + c.Assert(err, checker.IsNil) + + err = try.GetRequest("http://127.0.0.1:8080/api/providers/consul_catalog/backends", 5*time.Second, try.BodyContains("nginx1")) + c.Assert(err, checker.IsNil) + + err = s.registerService("test", nginx.NetworkSettings.IPAddress, 80, []string{"name=nginx1", "traefik.enable=true"}) + c.Assert(err, checker.IsNil, check.Commentf("Error registering service")) + + defer s.deregisterService("test", nginx.NetworkSettings.IPAddress) + + err = try.GetRequest("http://127.0.0.1:8080/api/providers/consul_catalog/backends", 60*time.Second, try.BodyContains("nginx1")) + c.Assert(err, checker.IsNil) + + err = try.Request(req, 20*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody()) + c.Assert(err, checker.IsNil) +} + func (s *ConsulCatalogSuite) TestRetryWithConsulServer(c *check.C) { //Scale consul to 0 to be able to start traefik before and test retry s.composeProject.Scale(c, "consul", 0) diff --git a/provider/consul/consul_catalog.go b/provider/consul/consul_catalog.go index 8f0510510..b87d63e90 100644 --- a/provider/consul/consul_catalog.go +++ b/provider/consul/consul_catalog.go @@ -78,8 +78,11 @@ func (a nodeSorter) Less(i int, j int) bool { } func hasChanged(current map[string]Service, previous map[string]Service) bool { + if len(current) != len(previous) { + return true + } addedServiceKeys, removedServiceKeys := getChangedServiceKeys(current, previous) - return len(removedServiceKeys) > 0 || len(addedServiceKeys) > 0 || hasNodeOrTagsChanged(current, previous) + return len(removedServiceKeys) > 0 || len(addedServiceKeys) > 0 || hasServiceChanged(current, previous) } func getChangedServiceKeys(current map[string]Service, previous map[string]Service) ([]string, []string) { @@ -92,20 +95,24 @@ func getChangedServiceKeys(current map[string]Service, previous map[string]Servi return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string) } -func hasNodeOrTagsChanged(current map[string]Service, previous map[string]Service) bool { - var added []string - var removed []string +func hasServiceChanged(current map[string]Service, previous map[string]Service) bool { for key, value := range current { if prevValue, ok := previous[key]; ok { addedNodesKeys, removedNodesKeys := getChangedStringKeys(value.Nodes, prevValue.Nodes) - added = append(added, addedNodesKeys...) - removed = append(removed, removedNodesKeys...) + if len(addedNodesKeys) > 0 || len(removedNodesKeys) > 0 { + return true + } addedTagsKeys, removedTagsKeys := getChangedStringKeys(value.Tags, prevValue.Tags) - added = append(added, addedTagsKeys...) - removed = append(removed, removedTagsKeys...) + if len(addedTagsKeys) > 0 || len(removedTagsKeys) > 0 { + return true + } + addedPortsKeys, removedPortsKeys := getChangedIntKeys(value.Ports, prevValue.Ports) + if len(addedPortsKeys) > 0 || len(removedPortsKeys) > 0 { + return true + } } } - return len(added) > 0 || len(removed) > 0 + return false } func getChangedStringKeys(currState []string, prevState []string) ([]string, []string) { @@ -118,6 +125,16 @@ func getChangedStringKeys(currState []string, prevState []string) ([]string, []s return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string) } +func getChangedIntKeys(currState []int, prevState []int) ([]int, []int) { + currKeySet := fun.Set(currState).(map[int]bool) + prevKeySet := fun.Set(prevState).(map[int]bool) + + addedKeys := fun.Difference(currKeySet, prevKeySet).(map[int]bool) + removedKeys := fun.Difference(prevKeySet, currKeySet).(map[int]bool) + + return fun.Keys(addedKeys).([]int), fun.Keys(removedKeys).([]int) +} + func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[string][]string, errorCh chan<- error) { health := p.client.Health() catalog := p.client.Catalog() @@ -194,6 +211,7 @@ type Service struct { Name string Tags []string Nodes []string + Ports []int } func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- map[string][]string, errorCh chan<- error) { @@ -235,14 +253,17 @@ func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh c return } nodesID := getServiceIds(nodes) + ports := getServicePorts(nodes) if service, ok := current[key]; ok { service.Tags = value service.Nodes = nodesID + service.Ports = ports } else { service := Service{ Name: key, Tags: value, Nodes: nodesID, + Ports: ports, } current[key] = service } @@ -267,6 +288,14 @@ func getServiceIds(services []*api.CatalogService) []string { return serviceIds } +func getServicePorts(services []*api.CatalogService) []int { + var servicePorts []int + for _, service := range services { + servicePorts = append(servicePorts, service.ServicePort) + } + return servicePorts +} + func (p *CatalogProvider) healthyNodes(service string) (catalogUpdate, error) { health := p.client.Health() opts := &api.QueryOptions{} @@ -279,7 +308,7 @@ func (p *CatalogProvider) healthyNodes(service string) (catalogUpdate, error) { return p.nodeFilter(service, node) }, data).([]*api.ServiceEntry) - //Merge tags of nodes matching constraints, in a single slice. + // Merge tags of nodes matching constraints, in a single slice. tags := fun.Foldl(func(node *api.ServiceEntry, set []string) []string { return fun.Keys(fun.Union( fun.Set(set), @@ -480,8 +509,8 @@ func (p *CatalogProvider) buildConfig(catalog []catalogUpdate) *types.Configurat "hasMaxconnAttributes": p.hasMaxconnAttributes, } - allNodes := []*api.ServiceEntry{} - services := []*serviceUpdate{} + var allNodes []*api.ServiceEntry + var services []*serviceUpdate for _, info := range catalog { if len(info.Nodes) > 0 { services = append(services, info.Service) @@ -519,7 +548,7 @@ func (p *CatalogProvider) hasMaxconnAttributes(attributes []string) bool { func (p *CatalogProvider) getNodes(index map[string][]string) ([]catalogUpdate, error) { visited := make(map[string]bool) - nodes := []catalogUpdate{} + var nodes []catalogUpdate for service := range index { name := strings.ToLower(service) if !strings.Contains(name, " ") && !visited[name] { @@ -555,7 +584,7 @@ func (p *CatalogProvider) watch(configurationChan chan<- types.ConfigMessage, st return nil case index, ok := <-watchCh: if !ok { - return errors.New("Consul service list nil") + return errors.New("consul service list nil") } log.Debug("List of services changed") nodes, err := p.getNodes(index) diff --git a/provider/consul/consul_catalog_test.go b/provider/consul/consul_catalog_test.go index 3e6899f80..ce86cf74e 100644 --- a/provider/consul/consul_catalog_test.go +++ b/provider/consul/consul_catalog_test.go @@ -1094,7 +1094,7 @@ func TestConsulCatalogHasNodeOrTagschanged(t *testing.T) { expected: false, }, { - desc: "Change detected con tags", + desc: "Change detected on tags", current: map[string]Service{ "foo-service": { Name: "foo", @@ -1111,6 +1111,66 @@ func TestConsulCatalogHasNodeOrTagschanged(t *testing.T) { }, expected: true, }, + { + desc: "Change detected on ports", + current: map[string]Service{ + "foo-service": { + Name: "foo", + Nodes: []string{"node1"}, + Tags: []string{"foo=bar"}, + Ports: []int{80}, + }, + }, + previous: map[string]Service{ + "foo-service": { + Name: "foo", + Nodes: []string{"node1"}, + Tags: []string{"foo"}, + Ports: []int{81}, + }, + }, + expected: true, + }, + { + desc: "Change detected on ports", + current: map[string]Service{ + "foo-service": { + Name: "foo", + Nodes: []string{"node1"}, + Tags: []string{"foo=bar"}, + Ports: []int{80}, + }, + }, + previous: map[string]Service{ + "foo-service": { + Name: "foo", + Nodes: []string{"node1"}, + Tags: []string{"foo"}, + Ports: []int{81, 82}, + }, + }, + expected: true, + }, + { + desc: "No Change detected", + current: map[string]Service{ + "foo-service": { + Name: "foo", + Nodes: []string{"node1"}, + Tags: []string{"foo"}, + Ports: []int{80}, + }, + }, + previous: map[string]Service{ + "foo-service": { + Name: "foo", + Nodes: []string{"node1"}, + Tags: []string{"foo"}, + Ports: []int{80}, + }, + }, + expected: false, + }, } for _, test := range testCases { @@ -1118,7 +1178,7 @@ func TestConsulCatalogHasNodeOrTagschanged(t *testing.T) { t.Run(test.desc, func(t *testing.T) { t.Parallel() - actual := hasNodeOrTagsChanged(test.current, test.previous) + actual := hasServiceChanged(test.current, test.previous) assert.Equal(t, test.expected, actual) }) }