Add support for stale reads from Consul catalog
This commit is contained in:
parent
9e012a6b54
commit
886cc83ad9
3 changed files with 14 additions and 5 deletions
|
@ -108,6 +108,7 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
|
||||||
defaultConsulCatalog.Constraints = types.Constraints{}
|
defaultConsulCatalog.Constraints = types.Constraints{}
|
||||||
defaultConsulCatalog.Prefix = "traefik"
|
defaultConsulCatalog.Prefix = "traefik"
|
||||||
defaultConsulCatalog.FrontEndRule = "Host:{{.ServiceName}}.{{.Domain}}"
|
defaultConsulCatalog.FrontEndRule = "Host:{{.ServiceName}}.{{.Domain}}"
|
||||||
|
defaultConsulCatalog.Stale = false
|
||||||
|
|
||||||
// default Etcd
|
// default Etcd
|
||||||
var defaultEtcd etcd.Provider
|
var defaultEtcd etcd.Provider
|
||||||
|
|
|
@ -24,6 +24,13 @@ endpoint = "127.0.0.1:8500"
|
||||||
#
|
#
|
||||||
exposedByDefault = false
|
exposedByDefault = false
|
||||||
|
|
||||||
|
# Allow Consul server to serve the catalog reads regardless of whether it is the leader.
|
||||||
|
#
|
||||||
|
# Optional
|
||||||
|
# Default: false
|
||||||
|
#
|
||||||
|
stale = false
|
||||||
|
|
||||||
# Default domain used.
|
# Default domain used.
|
||||||
#
|
#
|
||||||
# Optional
|
# Optional
|
||||||
|
|
|
@ -32,6 +32,7 @@ type Provider struct {
|
||||||
provider.BaseProvider `mapstructure:",squash" export:"true"`
|
provider.BaseProvider `mapstructure:",squash" export:"true"`
|
||||||
Endpoint string `description:"Consul server endpoint"`
|
Endpoint string `description:"Consul server endpoint"`
|
||||||
Domain string `description:"Default domain used"`
|
Domain string `description:"Default domain used"`
|
||||||
|
Stale bool `description:"Use stale consistency for catalog reads" export:"true"`
|
||||||
ExposedByDefault bool `description:"Expose Consul services by default" export:"true"`
|
ExposedByDefault bool `description:"Expose Consul services by default" export:"true"`
|
||||||
Prefix string `description:"Prefix used for Consul catalog tags" export:"true"`
|
Prefix string `description:"Prefix used for Consul catalog tags" export:"true"`
|
||||||
FrontEndRule string `description:"Frontend rule used for Consul services" export:"true"`
|
FrontEndRule string `description:"Frontend rule used for Consul services" export:"true"`
|
||||||
|
@ -186,7 +187,7 @@ func (p *Provider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- m
|
||||||
// variable to hold previous state
|
// variable to hold previous state
|
||||||
var flashback map[string]Service
|
var flashback map[string]Service
|
||||||
|
|
||||||
options := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
|
options := &api.QueryOptions{WaitTime: DefaultWatchWaitTime, AllowStale: p.Stale}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -211,7 +212,7 @@ func (p *Provider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- m
|
||||||
if data != nil {
|
if data != nil {
|
||||||
current := make(map[string]Service)
|
current := make(map[string]Service)
|
||||||
for key, value := range data {
|
for key, value := range data {
|
||||||
nodes, _, err := catalog.Service(key, "", &api.QueryOptions{})
|
nodes, _, err := catalog.Service(key, "", &api.QueryOptions{AllowStale: p.Stale})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Failed to get detail of service %s: %v", key, err)
|
log.Errorf("Failed to get detail of service %s: %v", key, err)
|
||||||
notifyError(err)
|
notifyError(err)
|
||||||
|
@ -259,7 +260,7 @@ func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[s
|
||||||
var flashback map[string][]string
|
var flashback map[string][]string
|
||||||
var flashbackMaintenance []string
|
var flashbackMaintenance []string
|
||||||
|
|
||||||
options := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
|
options := &api.QueryOptions{WaitTime: DefaultWatchWaitTime, AllowStale: p.Stale}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -305,7 +306,7 @@ func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[s
|
||||||
options.WaitIndex = meta.LastIndex
|
options.WaitIndex = meta.LastIndex
|
||||||
|
|
||||||
// The response should be unified with watchCatalogServices
|
// The response should be unified with watchCatalogServices
|
||||||
data, _, err := catalog.Services(&api.QueryOptions{})
|
data, _, err := catalog.Services(&api.QueryOptions{AllowStale: p.Stale})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Failed to list services: %v", err)
|
log.Errorf("Failed to list services: %v", err)
|
||||||
notifyError(err)
|
notifyError(err)
|
||||||
|
@ -473,7 +474,7 @@ func getServiceAddresses(services []*api.CatalogService) []string {
|
||||||
|
|
||||||
func (p *Provider) healthyNodes(service string) (catalogUpdate, error) {
|
func (p *Provider) healthyNodes(service string) (catalogUpdate, error) {
|
||||||
health := p.client.Health()
|
health := p.client.Health()
|
||||||
data, _, err := health.Service(service, "", true, &api.QueryOptions{})
|
data, _, err := health.Service(service, "", true, &api.QueryOptions{AllowStale: p.Stale})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Errorf("Failed to fetch details of %s", service)
|
log.WithError(err).Errorf("Failed to fetch details of %s", service)
|
||||||
return catalogUpdate{}, err
|
return catalogUpdate{}, err
|
||||||
|
|
Loading…
Reference in a new issue