diff --git a/provider/consul/consul_catalog.go b/provider/consul/consul_catalog.go index 97000ed67..cea07e1b9 100644 --- a/provider/consul/consul_catalog.go +++ b/provider/consul/consul_catalog.go @@ -6,6 +6,7 @@ import ( "sort" "strconv" "strings" + "sync" "text/template" "time" @@ -135,7 +136,7 @@ func getChangedIntKeys(currState []int, prevState []int) ([]int, []int) { return fun.Keys(addedKeys).([]int), fun.Keys(removedKeys).([]int) } -func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[string][]string, errorCh chan<- error) { +func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[string][]string, notifyError func(error)) { health := p.client.Health() catalog := p.client.Catalog() @@ -156,7 +157,7 @@ func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan< healthyState, meta, err := health.State("passing", options) if err != nil { log.WithError(err).Error("Failed to retrieve health checks") - errorCh <- err + notifyError(err) return } @@ -180,7 +181,7 @@ func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan< data, _, err := catalog.Services(&api.QueryOptions{}) if err != nil { log.Errorf("Failed to list services: %s", err) - errorCh <- err + notifyError(err) return } @@ -214,7 +215,7 @@ type Service struct { Ports []int } -func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- map[string][]string, errorCh chan<- error) { +func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- map[string][]string, notifyError func(error)) { catalog := p.client.Catalog() safe.Go(func() { @@ -233,7 +234,7 @@ func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh c data, meta, err := catalog.Services(options) if err != nil { log.Errorf("Failed to list services: %s", err) - errorCh <- err + notifyError(err) return } @@ -249,7 +250,7 @@ func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh c nodes, _, err := catalog.Service(key, "", &api.QueryOptions{}) if err != nil { log.Errorf("Failed to get detail of service %s: %s", key, err) - errorCh <- err + notifyError(err) return } nodesID := getServiceIds(nodes) @@ -572,8 +573,15 @@ func (p *CatalogProvider) watch(configurationChan chan<- types.ConfigMessage, st watchCh := make(chan map[string][]string) errorCh := make(chan error) - p.watchHealthState(stopCh, watchCh, errorCh) - p.watchCatalogServices(stopCh, watchCh, errorCh) + var errorOnce sync.Once + notifyError := func(err error) { + errorOnce.Do(func() { + errorCh <- err + }) + } + + p.watchHealthState(stopCh, watchCh, notifyError) + p.watchCatalogServices(stopCh, watchCh, notifyError) defer close(stopCh) defer close(watchCh)