diff --git a/provider/consul/consul_catalog.go b/provider/consul/consul_catalog.go index 603374bea..74c08b60e 100644 --- a/provider/consul/consul_catalog.go +++ b/provider/consul/consul_catalog.go @@ -76,18 +76,25 @@ func (a nodeSorter) Less(i int, j int) bool { return lentr.Service.Port < rentr.Service.Port } -func (p *CatalogProvider) watchServices(stopCh <-chan struct{}) <-chan map[string][]string { - watchCh := make(chan map[string][]string) +func getChangedKeys(currState map[string][]string, prevState map[string][]string) ([]string, []string) { + currKeySet := fun.Set(fun.Keys(currState).([]string)).(map[string]bool) + prevKeySet := fun.Set(fun.Keys(prevState).([]string)).(map[string]bool) - catalog := p.client.Catalog() + addedKeys := fun.Difference(currKeySet, prevKeySet).(map[string]bool) + removedKeys := fun.Difference(prevKeySet, currKeySet).(map[string]bool) + + return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string) +} + +func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[string][]string) { health := p.client.Health() - var lastHealthIndex uint64 + catalog := p.client.Catalog() safe.Go(func() { - defer close(watchCh) + // variable to hold previous state + var flashback map[string][]string - catalogOptions := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} - healthOptions := &api.QueryOptions{} + options := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} for { select { @@ -96,16 +103,10 @@ func (p *CatalogProvider) watchServices(stopCh <-chan struct{}) <-chan map[strin default: } - data, catalogMeta, err := catalog.Services(catalogOptions) - if err != nil { - log.WithError(err).Error("Failed to list services") - return - } - // Listening to changes that leads to `passing` state or degrades from it. // The call is used just as a trigger for further actions // (intentionally there is no interest in the received data). - _, healthMeta, err := health.State("passing", healthOptions) + _, meta, err := health.State("passing", options) if err != nil { log.WithError(err).Error("Failed to retrieve health checks") return @@ -113,21 +114,89 @@ func (p *CatalogProvider) watchServices(stopCh <-chan struct{}) <-chan map[strin // If LastIndex didn't change then it means `Get` returned // because of the WaitTime and the key didn't changed. - sameServiceAmount := catalogOptions.WaitIndex == catalogMeta.LastIndex - sameServiceHealth := lastHealthIndex == healthMeta.LastIndex - if sameServiceAmount && sameServiceHealth { + if options.WaitIndex == meta.LastIndex { continue } - catalogOptions.WaitIndex = catalogMeta.LastIndex - lastHealthIndex = healthMeta.LastIndex + + options.WaitIndex = meta.LastIndex + + // The response should be unified with watchCatalogServices + data, _, err := catalog.Services(&api.QueryOptions{}) + if err != nil { + log.Errorf("Failed to list services: %s", err) + return + } if data != nil { - watchCh <- data + // A critical note is that the return of a blocking request is no guarantee of a change. + // It is possible that there was an idempotent write that does not affect the result of the query. + // Thus it is required to do extra check for changes... + addedKeys, removedKeys := getChangedKeys(data, flashback) + + if len(addedKeys) > 0 { + log.WithField("DiscoveredServices", addedKeys).Debug("Health State change detected.") + watchCh <- data + flashback = data + } + + if len(removedKeys) > 0 { + log.WithField("MissingServices", removedKeys).Debug("Health State change detected.") + watchCh <- data + flashback = data + } } } }) +} - return watchCh +func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- map[string][]string) { + catalog := p.client.Catalog() + + safe.Go(func() { + // variable to hold previous state + var flashback map[string][]string + + options := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} + + for { + select { + case <-stopCh: + return + default: + } + + data, meta, err := catalog.Services(options) + if err != nil { + log.Errorf("Failed to list services: %s", err) + return + } + + if options.WaitIndex == meta.LastIndex { + continue + } + + options.WaitIndex = meta.LastIndex + + if data != nil { + // A critical note is that the return of a blocking request is no guarantee of a change. + // It is possible that there was an idempotent write that does not affect the result of the query. + // Thus it is required to do extra check for changes... + addedKeys, removedKeys := getChangedKeys(data, flashback) + + if len(addedKeys) > 0 { + log.WithField("DiscoveredServices", addedKeys).Debug("Catalog Services change detected.") + watchCh <- data + flashback = data + } + + if len(removedKeys) > 0 { + log.WithField("MissingServices", removedKeys).Debug("Catalog Services change detected.") + watchCh <- data + flashback = data + } + } + } + }) } func (p *CatalogProvider) healthyNodes(service string) (catalogUpdate, error) { @@ -357,15 +426,19 @@ func (p *CatalogProvider) getNodes(index map[string][]string) ([]catalogUpdate, func (p *CatalogProvider) watch(configurationChan chan<- types.ConfigMessage, stop chan bool) error { stopCh := make(chan struct{}) - serviceCatalog := p.watchServices(stopCh) + watchCh := make(chan map[string][]string) + + p.watchHealthState(stopCh, watchCh) + p.watchCatalogServices(stopCh, watchCh) defer close(stopCh) + defer close(watchCh) for { select { case <-stop: return nil - case index, ok := <-serviceCatalog: + case index, ok := <-watchCh: if !ok { return errors.New("Consul service list nil") } diff --git a/provider/consul/consul_catalog_test.go b/provider/consul/consul_catalog_test.go index 325c6338e..e92269b6e 100644 --- a/provider/consul/consul_catalog_test.go +++ b/provider/consul/consul_catalog_test.go @@ -6,6 +6,7 @@ import ( "testing" "text/template" + "github.com/BurntSushi/ty/fun" "github.com/containous/traefik/types" "github.com/hashicorp/consul/api" ) @@ -606,3 +607,146 @@ func TestConsulCatalogNodeSorter(t *testing.T) { } } } + +func TestConsulCatalogGetChangedKeys(t *testing.T) { + type Input struct { + currState map[string][]string + prevState map[string][]string + } + + type Output struct { + addedKeys []string + removedKeys []string + } + + cases := []struct { + input Input + output Output + }{ + { + input: Input{ + currState: map[string][]string{ + "foo-service": {"v1"}, + "bar-service": {"v1"}, + "baz-service": {"v1"}, + "qux-service": {"v1"}, + "quux-service": {"v1"}, + "quuz-service": {"v1"}, + "corge-service": {"v1"}, + "grault-service": {"v1"}, + "garply-service": {"v1"}, + "waldo-service": {"v1"}, + "fred-service": {"v1"}, + "plugh-service": {"v1"}, + "xyzzy-service": {"v1"}, + "thud-service": {"v1"}, + }, + prevState: map[string][]string{ + "foo-service": {"v1"}, + "bar-service": {"v1"}, + "baz-service": {"v1"}, + "qux-service": {"v1"}, + "quux-service": {"v1"}, + "quuz-service": {"v1"}, + "corge-service": {"v1"}, + "grault-service": {"v1"}, + "garply-service": {"v1"}, + "waldo-service": {"v1"}, + "fred-service": {"v1"}, + "plugh-service": {"v1"}, + "xyzzy-service": {"v1"}, + "thud-service": {"v1"}, + }, + }, + output: Output{ + addedKeys: []string{}, + removedKeys: []string{}, + }, + }, + { + input: Input{ + currState: map[string][]string{ + "foo-service": {"v1"}, + "bar-service": {"v1"}, + "baz-service": {"v1"}, + "qux-service": {"v1"}, + "quux-service": {"v1"}, + "quuz-service": {"v1"}, + "corge-service": {"v1"}, + "grault-service": {"v1"}, + "garply-service": {"v1"}, + "waldo-service": {"v1"}, + "fred-service": {"v1"}, + "plugh-service": {"v1"}, + "xyzzy-service": {"v1"}, + "thud-service": {"v1"}, + }, + prevState: map[string][]string{ + "foo-service": {"v1"}, + "bar-service": {"v1"}, + "baz-service": {"v1"}, + "corge-service": {"v1"}, + "grault-service": {"v1"}, + "garply-service": {"v1"}, + "waldo-service": {"v1"}, + "fred-service": {"v1"}, + "plugh-service": {"v1"}, + "xyzzy-service": {"v1"}, + "thud-service": {"v1"}, + }, + }, + output: Output{ + addedKeys: []string{"qux-service", "quux-service", "quuz-service"}, + removedKeys: []string{}, + }, + }, + { + input: Input{ + currState: map[string][]string{ + "foo-service": {"v1"}, + "qux-service": {"v1"}, + "quux-service": {"v1"}, + "quuz-service": {"v1"}, + "corge-service": {"v1"}, + "grault-service": {"v1"}, + "garply-service": {"v1"}, + "waldo-service": {"v1"}, + "fred-service": {"v1"}, + "plugh-service": {"v1"}, + "xyzzy-service": {"v1"}, + "thud-service": {"v1"}, + }, + prevState: map[string][]string{ + "foo-service": {"v1"}, + "bar-service": {"v1"}, + "baz-service": {"v1"}, + "qux-service": {"v1"}, + "quux-service": {"v1"}, + "quuz-service": {"v1"}, + "corge-service": {"v1"}, + "waldo-service": {"v1"}, + "fred-service": {"v1"}, + "plugh-service": {"v1"}, + "xyzzy-service": {"v1"}, + "thud-service": {"v1"}, + }, + }, + output: Output{ + addedKeys: []string{"grault-service", "garply-service"}, + removedKeys: []string{"bar-service", "baz-service"}, + }, + }, + } + + for _, c := range cases { + addedKeys, removedKeys := getChangedKeys(c.input.currState, c.input.prevState) + + if !reflect.DeepEqual(fun.Set(addedKeys), fun.Set(c.output.addedKeys)) { + t.Fatalf("Added keys comparison results: got %q, want %q", addedKeys, c.output.addedKeys) + } + + if !reflect.DeepEqual(fun.Set(removedKeys), fun.Set(c.output.removedKeys)) { + t.Fatalf("Removed keys comparison results: got %q, want %q", removedKeys, c.output.removedKeys) + } + } +}