diff --git a/integration/consul_catalog_test.go b/integration/consul_catalog_test.go index fdf35b41a..0cd124458 100644 --- a/integration/consul_catalog_test.go +++ b/integration/consul_catalog_test.go @@ -1,6 +1,7 @@ package integration import ( + "bytes" "fmt" "net/http" "time" @@ -41,7 +42,7 @@ func (s *ConsulCatalogSuite) waitToElectConsulLeader() error { leader, err := s.consulClient.Status().Leader() if err != nil || len(leader) == 0 { - return fmt.Errorf("Leader not found. %v", err) + return fmt.Errorf("leader not found. %v", err) } return nil @@ -55,9 +56,6 @@ func (s *ConsulCatalogSuite) createConsulClient(config *api.Config, c *check.C) s.consulClient = consulClient return consulClient } -func (s *ConsulCatalogSuite) startConsulService(c *check.C) { - -} func (s *ConsulCatalogSuite) registerService(name string, address string, port int, tags []string) error { catalog := s.consulClient.Catalog() @@ -80,7 +78,7 @@ func (s *ConsulCatalogSuite) registerService(name string, address string, port i func (s *ConsulCatalogSuite) registerAgentService(name string, address string, port int, tags []string) error { agent := s.consulClient.Agent() - err := agent.ServiceRegister( + return agent.ServiceRegister( &api.AgentServiceRegistration{ ID: address, Tags: tags, @@ -93,13 +91,24 @@ func (s *ConsulCatalogSuite) registerAgentService(name string, address string, p }, }, ) - return err +} + +func (s *ConsulCatalogSuite) registerCheck(name string, address string, port int) error { + agent := s.consulClient.Agent() + checkRegistration := &api.AgentCheckRegistration{ + ID: fmt.Sprintf("%s-%s", name, address), + Name: name, + ServiceID: address, + } + checkRegistration.HTTP = fmt.Sprintf("http://%s:%d/health", address, port) + checkRegistration.Interval = "2s" + checkRegistration.CheckID = address + return agent.CheckRegister(checkRegistration) } func (s *ConsulCatalogSuite) deregisterAgentService(address string) error { agent := s.consulClient.Agent() - err := agent.ServiceDeregister(address) - return err + return agent.ServiceDeregister(address) } func (s *ConsulCatalogSuite) deregisterService(name string, address string) error { @@ -514,3 +523,76 @@ func (s *ConsulCatalogSuite) TestRetryWithConsulServer(c *check.C) { err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody()) c.Assert(err, checker.IsNil) } + +func (s *ConsulCatalogSuite) TestServiceWithMultipleHealthCheck(c *check.C) { + //Scale consul to 0 to be able to start traefik before and test retry + s.composeProject.Scale(c, "consul", 0) + + cmd, display := s.traefikCmd( + withConfigFile("fixtures/consul_catalog/simple.toml"), + "--consulCatalog", + "--consulCatalog.watch=false", + "--consulCatalog.exposedByDefault=true", + "--consulCatalog.endpoint="+s.consulIP+":8500", + "--consulCatalog.domain=consul.localhost") + defer display(c) + err := cmd.Start() + c.Assert(err, checker.IsNil) + defer cmd.Process.Kill() + + // Wait for Traefik to turn ready. + err = try.GetRequest("http://127.0.0.1:8000/", 2*time.Second, try.StatusCodeIs(http.StatusNotFound)) + c.Assert(err, checker.IsNil) + + req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000/", nil) + c.Assert(err, checker.IsNil) + req.Host = "test.consul.localhost" + + // Request should fail + err = try.Request(req, 2*time.Second, try.StatusCodeIs(http.StatusNotFound), try.HasBody()) + c.Assert(err, checker.IsNil) + + // Scale consul to 1 + s.composeProject.Scale(c, "consul", 1) + s.waitToElectConsulLeader() + + whoami := s.composeProject.Container(c, "whoami1") + // Register service + err = s.registerAgentService("test", whoami.NetworkSettings.IPAddress, 80, []string{"name=whoami1"}) + c.Assert(err, checker.IsNil, check.Commentf("Error registering agent service")) + defer s.deregisterAgentService(whoami.NetworkSettings.IPAddress) + + // Register one healthcheck + err = s.registerCheck("test", whoami.NetworkSettings.IPAddress, 80) + c.Assert(err, checker.IsNil, check.Commentf("Error registering check")) + + // Provider consul catalog should be present + err = try.GetRequest("http://127.0.0.1:8080/api/providers", 10*time.Second, try.BodyContains("consul_catalog")) + c.Assert(err, checker.IsNil) + + // Should be ok + err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody()) + c.Assert(err, checker.IsNil) + + // Change health value of service to critical + reqHealth, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s:80/health", whoami.NetworkSettings.IPAddress), bytes.NewBuffer([]byte("500"))) + c.Assert(err, checker.IsNil) + reqHealth.Host = "test.consul.localhost" + + err = try.Request(reqHealth, 10*time.Second, try.StatusCodeIs(http.StatusOK)) + c.Assert(err, checker.IsNil) + + // Should be a 404 + err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusNotFound), try.HasBody()) + c.Assert(err, checker.IsNil) + + // Change health value of service to passing + reqHealth, err = http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s:80/health", whoami.NetworkSettings.IPAddress), bytes.NewBuffer([]byte("200"))) + c.Assert(err, checker.IsNil) + err = try.Request(reqHealth, 10*time.Second, try.StatusCodeIs(http.StatusOK)) + c.Assert(err, checker.IsNil) + + // Should be a 200 + err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody()) + c.Assert(err, checker.IsNil) +} diff --git a/provider/consulcatalog/consul_catalog.go b/provider/consulcatalog/consul_catalog.go index 65c44ac49..de8927760 100644 --- a/provider/consulcatalog/consul_catalog.go +++ b/provider/consulcatalog/consul_catalog.go @@ -2,6 +2,7 @@ package consulcatalog import ( "errors" + "fmt" "strconv" "strings" "sync" @@ -255,7 +256,7 @@ func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[s safe.Go(func() { // variable to hold previous state - var flashback []string + var flashback map[string][]string options := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} @@ -267,19 +268,28 @@ func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[s } // Listening to changes that leads to `passing` state or degrades from it. - healthyState, meta, err := health.State("passing", options) + healthyState, meta, err := health.State("any", options) if err != nil { log.WithError(err).Error("Failed to retrieve health checks") notifyError(err) return } - var current []string + var current = make(map[string][]string) + var currentFailing = make(map[string]*api.HealthCheck) if healthyState != nil { for _, healthy := range healthyState { - current = append(current, healthy.ServiceID) + key := fmt.Sprintf("%s-%s", healthy.Node, healthy.ServiceID) + _, failing := currentFailing[key] + if healthy.Status == "passing" && !failing { + current[key] = append(current[key], healthy.Node) + } else { + currentFailing[key] = healthy + if _, ok := current[key]; ok { + delete(current, key) + } + } } - } // If LastIndex didn't change then it means `Get` returned @@ -302,7 +312,7 @@ func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[s // A critical note is that the return of a blocking request is no guarantee of a change. // It is possible that there was an idempotent write that does not affect the result of the query. // Thus it is required to do extra check for changes... - addedKeys, removedKeys := getChangedStringKeys(current, flashback) + addedKeys, removedKeys, changedKeys := getChangedHealth(current, flashback) if len(addedKeys) > 0 { log.WithField("DiscoveredServices", addedKeys).Debug("Health State change detected.") @@ -315,6 +325,13 @@ func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[s watchCh <- data flashback = current } + + if len(changedKeys) > 0 { + log.WithField("ChangedServices", changedKeys).Debug("Health State change detected.") + watchCh <- data + flashback = current + } + } } }) @@ -394,6 +411,27 @@ func getChangedStringKeys(currState []string, prevState []string) ([]string, []s return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string) } +func getChangedHealth(current map[string][]string, previous map[string][]string) ([]string, []string, []string) { + currKeySet := fun.Set(fun.Keys(current).([]string)).(map[string]bool) + prevKeySet := fun.Set(fun.Keys(previous).([]string)).(map[string]bool) + + addedKeys := fun.Difference(currKeySet, prevKeySet).(map[string]bool) + removedKeys := fun.Difference(prevKeySet, currKeySet).(map[string]bool) + + var changedKeys []string + + for key, value := range current { + if prevValue, ok := previous[key]; ok { + addedNodesKeys, removedNodesKeys := getChangedStringKeys(value, prevValue) + if len(addedNodesKeys) > 0 || len(removedNodesKeys) > 0 { + changedKeys = append(changedKeys, key) + } + } + } + + return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string), changedKeys +} + func getChangedIntKeys(currState []int, prevState []int) ([]int, []int) { currKeySet := fun.Set(currState).(map[int]bool) prevKeySet := fun.Set(prevState).(map[int]bool)