Watch for Consul events to rebuild the dynamic configuration

Co-authored-by: Kevin Pollet <pollet.kevin@gmail.com>
Co-authored-by: Romain <rtribotte@users.noreply.github.com>
This commit is contained in:
JasonWang2016 2022-01-29 00:16:07 +08:00 committed by GitHub
parent 1048348ae6
commit 7543709ecf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 307 additions and 76 deletions

View file

@ -721,3 +721,27 @@ providers:
--providers.consulcatalog.namespace=production --providers.consulcatalog.namespace=production
# ... # ...
``` ```
### `watch`
_Optional, Default=false_
When set to `true`, watches for Consul changes ([Consul watches checks](https://www.consul.io/docs/dynamic-app-config/watches#checks)).
```yaml tab="File (YAML)"
providers:
consulCatalog:
watch: true
# ...
```
```toml tab="File (TOML)"
[providers.consulCatalog]
watch = true
# ...
```
```bash tab="CLI"
--providers.consulcatalog.watch=true
# ...
```

View file

@ -456,6 +456,9 @@ Name of the Traefik service in Consul Catalog (needs to be registered via the or
`--providers.consulcatalog.stale`: `--providers.consulcatalog.stale`:
Use stale consistency for catalog reads. (Default: ```false```) Use stale consistency for catalog reads. (Default: ```false```)
`--providers.consulcatalog.watch`:
Watch Consul API events. (Default: ```false```)
`--providers.docker`: `--providers.docker`:
Enable Docker backend with default settings. (Default: ```false```) Enable Docker backend with default settings. (Default: ```false```)

View file

@ -423,6 +423,9 @@ Name of the Traefik service in Consul Catalog (needs to be registered via the or
`TRAEFIK_PROVIDERS_CONSULCATALOG_STALE`: `TRAEFIK_PROVIDERS_CONSULCATALOG_STALE`:
Use stale consistency for catalog reads. (Default: ```false```) Use stale consistency for catalog reads. (Default: ```false```)
`TRAEFIK_PROVIDERS_CONSULCATALOG_WATCH`:
Watch Consul API events. (Default: ```false```)
`TRAEFIK_PROVIDERS_CONSUL_ENDPOINTS`: `TRAEFIK_PROVIDERS_CONSUL_ENDPOINTS`:
KV store endpoints (Default: ```127.0.0.1:8500```) KV store endpoints (Default: ```127.0.0.1:8500```)

View file

@ -149,6 +149,7 @@
exposedByDefault = true exposedByDefault = true
defaultRule = "foobar" defaultRule = "foobar"
namespace = "foobar" namespace = "foobar"
watch = true
[providers.consulCatalog.endpoint] [providers.consulCatalog.endpoint]
address = "foobar" address = "foobar"
scheme = "foobar" scheme = "foobar"

View file

@ -161,6 +161,7 @@ providers:
exposedByDefault: true exposedByDefault: true
defaultRule: foobar defaultRule: foobar
namespace: foobar namespace: foobar
watch: true
endpoint: endpoint:
address: foobar address: foobar
scheme: foobar scheme: foobar

View file

@ -234,6 +234,80 @@ func (s *ConsulCatalogSuite) TestSimpleConfiguration(c *check.C) {
c.Assert(err, checker.IsNil) c.Assert(err, checker.IsNil)
} }
func (s *ConsulCatalogSuite) TestSimpleConfigurationWithWatch(c *check.C) {
tempObjects := struct {
ConsulAddress string
DefaultRule string
}{
ConsulAddress: s.consulURL,
DefaultRule: "Host(`{{ normalize .Name }}.consul.localhost`)",
}
file := s.adaptFile(c, "fixtures/consul_catalog/simple_watch.toml", tempObjects)
defer os.Remove(file)
reg := &api.AgentServiceRegistration{
ID: "whoami1",
Name: "whoami",
Tags: []string{"traefik.enable=true"},
Port: 80,
Address: s.getComposeServiceIP(c, "whoami1"),
}
err := s.registerService(reg, false)
c.Assert(err, checker.IsNil)
cmd, display := s.traefikCmd(withConfigFile(file))
defer display(c)
err = cmd.Start()
c.Assert(err, checker.IsNil)
defer s.killCmd(cmd)
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000/", nil)
c.Assert(err, checker.IsNil)
req.Host = "whoami.consul.localhost"
err = try.Request(req, 2*time.Second, try.StatusCodeIs(http.StatusOK), try.BodyContainsOr("Hostname: whoami1"))
c.Assert(err, checker.IsNil)
err = s.deregisterService("whoami1", false)
c.Assert(err, checker.IsNil)
err = try.Request(req, 2*time.Second, try.StatusCodeIs(http.StatusNotFound))
c.Assert(err, checker.IsNil)
whoamiIP := s.getComposeServiceIP(c, "whoami1")
reg.Check = &api.AgentServiceCheck{
CheckID: "some-ok-check",
TCP: whoamiIP + ":80",
Name: "some-ok-check",
Interval: "1s",
Timeout: "1s",
}
err = s.registerService(reg, false)
c.Assert(err, checker.IsNil)
err = try.Request(req, 2*time.Second, try.StatusCodeIs(http.StatusOK), try.BodyContainsOr("Hostname: whoami1"))
c.Assert(err, checker.IsNil)
reg.Check = &api.AgentServiceCheck{
CheckID: "some-failing-check",
TCP: ":80",
Name: "some-failing-check",
Interval: "1s",
Timeout: "1s",
}
err = s.registerService(reg, false)
c.Assert(err, checker.IsNil)
err = try.Request(req, 2*time.Second, try.StatusCodeIs(http.StatusNotFound))
c.Assert(err, checker.IsNil)
err = s.deregisterService("whoami1", false)
c.Assert(err, checker.IsNil)
}
func (s *ConsulCatalogSuite) TestRegisterServiceWithoutIP(c *check.C) { func (s *ConsulCatalogSuite) TestRegisterServiceWithoutIP(c *check.C) {
tempObjects := struct { tempObjects := struct {
ConsulAddress string ConsulAddress string

View file

@ -0,0 +1,22 @@
[global]
checkNewVersion = false
sendAnonymousUsage = false
[log]
level = "DEBUG"
[entryPoints]
[entryPoints.web]
address = ":8000"
[api]
insecure = true
[providers]
[providers.consulCatalog]
exposedByDefault = true
refreshInterval = "500ms"
defaultRule = "{{ .DefaultRule }}"
watch = true
[providers.consulCatalog.endpoint]
address = "{{ .ConsulAddress }}"

View file

@ -12,8 +12,6 @@ import (
type connectCert struct { type connectCert struct {
root []string root []string
leaf keyPair leaf keyPair
// err is used to propagate to the caller (Provide) any error occurring within the certificate watcher goroutines.
err error
} }
func (c *connectCert) getRoot() []traefiktls.FileOrContent { func (c *connectCert) getRoot() []traefiktls.FileOrContent {

View file

@ -56,10 +56,12 @@ type Provider struct {
ConnectByDefault bool `description:"Consider every service as Connect capable by default." json:"connectByDefault,omitempty" toml:"connectByDefault,omitempty" yaml:"connectByDefault,omitempty" export:"true"` ConnectByDefault bool `description:"Consider every service as Connect capable by default." json:"connectByDefault,omitempty" toml:"connectByDefault,omitempty" yaml:"connectByDefault,omitempty" export:"true"`
ServiceName string `description:"Name of the Traefik service in Consul Catalog (needs to be registered via the orchestrator or manually)." json:"serviceName,omitempty" toml:"serviceName,omitempty" yaml:"serviceName,omitempty" export:"true"` ServiceName string `description:"Name of the Traefik service in Consul Catalog (needs to be registered via the orchestrator or manually)." json:"serviceName,omitempty" toml:"serviceName,omitempty" yaml:"serviceName,omitempty" export:"true"`
Namespace string `description:"Sets the namespace used to discover services (Consul Enterprise only)." json:"namespace,omitempty" toml:"namespace,omitempty" yaml:"namespace,omitempty" export:"true"` Namespace string `description:"Sets the namespace used to discover services (Consul Enterprise only)." json:"namespace,omitempty" toml:"namespace,omitempty" yaml:"namespace,omitempty" export:"true"`
Watch bool `description:"Watch Consul API events." json:"watch,omitempty" toml:"watch,omitempty" yaml:"watch,omitempty" export:"true"`
client *api.Client client *api.Client
defaultRuleTpl *template.Template defaultRuleTpl *template.Template
certChan chan *connectCert certChan chan *connectCert
watchServicesChan chan struct{}
} }
// EndpointConfig holds configurations of the endpoint. // EndpointConfig holds configurations of the endpoint.
@ -98,7 +100,9 @@ func (p *Provider) Init() error {
} }
p.defaultRuleTpl = defaultRuleTpl p.defaultRuleTpl = defaultRuleTpl
p.certChan = make(chan *connectCert) p.certChan = make(chan *connectCert, 1)
p.watchServicesChan = make(chan struct{}, 1)
return nil return nil
} }
@ -107,27 +111,31 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
var err error var err error
p.client, err = createClient(p.Namespace, p.Endpoint) p.client, err = createClient(p.Namespace, p.Endpoint)
if err != nil { if err != nil {
return fmt.Errorf("unable to create consul client: %w", err) return fmt.Errorf("failed to create consul client: %w", err)
} }
if p.ConnectAware {
leafWatcher, rootWatcher, err := p.createConnectTLSWatchers()
if err != nil {
return fmt.Errorf("unable to create consul watch plans: %w", err)
}
pool.GoCtx(func(routineCtx context.Context) {
p.watchConnectTLS(routineCtx, leafWatcher, rootWatcher)
})
}
var certInfo *connectCert
pool.GoCtx(func(routineCtx context.Context) { pool.GoCtx(func(routineCtx context.Context) {
ctxLog := log.With(routineCtx, log.Str(log.ProviderName, "consulcatalog")) ctxLog := log.With(routineCtx, log.Str(log.ProviderName, "consulcatalog"))
logger := log.FromContext(ctxLog) logger := log.FromContext(ctxLog)
operation := func() error { operation := func() error {
var err error ctx, cancel := context.WithCancel(ctxLog)
// When the operation terminates, we want to clean up the
// goroutines in watchConnectTLS and watchServices.
defer cancel()
errChan := make(chan error, 2)
if p.ConnectAware {
go func() {
if err := p.watchConnectTLS(ctx); err != nil {
errChan <- fmt.Errorf("failed to watch connect certificates: %w", err)
}
}()
}
var certInfo *connectCert
// If we are running in connect aware mode then we need to // If we are running in connect aware mode then we need to
// make sure that we obtain the certificates before starting // make sure that we obtain the certificates before starting
@ -137,37 +145,46 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
if p.ConnectAware && !certInfo.isReady() { if p.ConnectAware && !certInfo.isReady() {
logger.Infof("Waiting for Connect certificate before building first configuration") logger.Infof("Waiting for Connect certificate before building first configuration")
select { select {
case <-routineCtx.Done(): case <-ctx.Done():
return nil return nil
case err = <-errChan:
return err
case certInfo = <-p.certChan: case certInfo = <-p.certChan:
if certInfo.err != nil {
return backoff.Permanent(err)
}
} }
} }
// get configuration at the provider's startup. // get configuration at the provider's startup.
err = p.loadConfiguration(ctxLog, certInfo, configurationChan) if err = p.loadConfiguration(ctx, certInfo, configurationChan); err != nil {
if err != nil {
return fmt.Errorf("failed to get consul catalog data: %w", err) return fmt.Errorf("failed to get consul catalog data: %w", err)
} }
// Periodic refreshes. go func() {
ticker := time.NewTicker(time.Duration(p.RefreshInterval)) // Periodic refreshes.
defer ticker.Stop() if !p.Watch {
repeatSend(ctx, time.Duration(p.RefreshInterval), p.watchServicesChan)
return
}
if err := p.watchServices(ctx); err != nil {
errChan <- fmt.Errorf("failed to watch services: %w", err)
}
}()
for { for {
select { select {
case <-routineCtx.Done(): case <-ctx.Done():
return nil return nil
case <-ticker.C:
case err = <-errChan:
return err
case certInfo = <-p.certChan: case certInfo = <-p.certChan:
if certInfo.err != nil { case <-p.watchServicesChan:
return backoff.Permanent(err)
}
} }
err = p.loadConfiguration(ctxLog, certInfo, configurationChan)
if err != nil { if err = p.loadConfiguration(ctx, certInfo, configurationChan); err != nil {
return fmt.Errorf("failed to refresh consul catalog data: %w", err) return fmt.Errorf("failed to refresh consul catalog data: %w", err)
} }
} }
@ -330,6 +347,67 @@ func (p *Provider) fetchService(ctx context.Context, name string, connectEnabled
return consulServices, statuses, err return consulServices, statuses, err
} }
// watchServices watches for update events of the services list and statuses,
// and transmits them to the caller through the p.watchServicesChan.
func (p *Provider) watchServices(ctx context.Context) error {
servicesWatcher, err := watch.Parse(map[string]interface{}{"type": "services"})
if err != nil {
return fmt.Errorf("failed to create services watcher plan: %w", err)
}
servicesWatcher.HybridHandler = func(_ watch.BlockingParamVal, _ interface{}) {
select {
case <-ctx.Done():
case p.watchServicesChan <- struct{}{}:
default:
// Event chan is full, discard event.
}
}
checksWatcher, err := watch.Parse(map[string]interface{}{"type": "checks"})
if err != nil {
return fmt.Errorf("failed to create checks watcher plan: %w", err)
}
checksWatcher.HybridHandler = func(_ watch.BlockingParamVal, _ interface{}) {
select {
case <-ctx.Done():
case p.watchServicesChan <- struct{}{}:
default:
// Event chan is full, discard event.
}
}
logger := hclog.New(&hclog.LoggerOptions{
Name: "consulcatalog",
Level: hclog.LevelFromString(logrus.GetLevel().String()),
JSONFormat: true,
})
errChan := make(chan error, 2)
defer func() {
servicesWatcher.Stop()
checksWatcher.Stop()
}()
go func() {
errChan <- servicesWatcher.RunWithClientAndHclog(p.client, logger)
}()
go func() {
errChan <- checksWatcher.RunWithClientAndHclog(p.client, logger)
}()
select {
case <-ctx.Done():
return nil
case err = <-errChan:
return fmt.Errorf("services or checks watcher terminated: %w", err)
}
}
func rootsWatchHandler(ctx context.Context, dest chan<- []string) func(watch.BlockingParamVal, interface{}) { func rootsWatchHandler(ctx context.Context, dest chan<- []string) func(watch.BlockingParamVal, interface{}) {
return func(_ watch.BlockingParamVal, raw interface{}) { return func(_ watch.BlockingParamVal, raw interface{}) {
if raw == nil { if raw == nil {
@ -348,7 +426,10 @@ func rootsWatchHandler(ctx context.Context, dest chan<- []string) func(watch.Blo
roots = append(roots, root.RootCertPEM) roots = append(roots, root.RootCertPEM)
} }
dest <- roots select {
case <-ctx.Done():
case dest <- roots:
}
} }
} }
@ -370,65 +451,59 @@ func leafWatcherHandler(ctx context.Context, dest chan<- keyPair) func(watch.Blo
return return
} }
dest <- keyPair{ kp := keyPair{
cert: v.CertPEM, cert: v.CertPEM,
key: v.PrivateKeyPEM, key: v.PrivateKeyPEM,
} }
select {
case <-ctx.Done():
case dest <- kp:
}
} }
} }
func (p *Provider) createConnectTLSWatchers() (*watch.Plan, *watch.Plan, error) { // watchConnectTLS watches for updates of the root certificate or the leaf
// certificate, and transmits them to the caller via p.certChan.
func (p *Provider) watchConnectTLS(ctx context.Context) error {
leafChan := make(chan keyPair)
leafWatcher, err := watch.Parse(map[string]interface{}{ leafWatcher, err := watch.Parse(map[string]interface{}{
"type": "connect_leaf", "type": "connect_leaf",
"service": p.ServiceName, "service": p.ServiceName,
}) })
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to create leaf cert watcher plan: %w", err) return fmt.Errorf("failed to create leaf cert watcher plan: %w", err)
} }
leafWatcher.HybridHandler = leafWatcherHandler(ctx, leafChan)
rootWatcher, err := watch.Parse(map[string]interface{}{ rootsChan := make(chan []string)
rootsWatcher, err := watch.Parse(map[string]interface{}{
"type": "connect_roots", "type": "connect_roots",
}) })
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to create root cert watcher plan: %w", err) return fmt.Errorf("failed to create roots cert watcher plan: %w", err)
} }
rootsWatcher.HybridHandler = rootsWatchHandler(ctx, rootsChan)
return leafWatcher, rootWatcher, nil hclogger := hclog.New(&hclog.LoggerOptions{
}
// watchConnectTLS watches for updates of the root certificate or the leaf
// certificate, and transmits them to the caller via p.certChan. Any error is also
// propagated up through p.certChan, in connectCert.err.
func (p *Provider) watchConnectTLS(ctx context.Context, leafWatcher *watch.Plan, rootWatcher *watch.Plan) {
ctxLog := log.With(ctx, log.Str(log.ProviderName, "consulcatalog"))
logger := log.FromContext(ctxLog)
leafChan := make(chan keyPair)
rootChan := make(chan []string)
leafWatcher.HybridHandler = leafWatcherHandler(ctx, leafChan)
rootWatcher.HybridHandler = rootsWatchHandler(ctx, rootChan)
logOpts := &hclog.LoggerOptions{
Name: "consulcatalog", Name: "consulcatalog",
Level: hclog.LevelFromString(logrus.GetLevel().String()), Level: hclog.LevelFromString(logrus.GetLevel().String()),
JSONFormat: true, JSONFormat: true,
} })
hclogger := hclog.New(logOpts) errChan := make(chan error, 2)
go func() { defer func() {
err := leafWatcher.RunWithClientAndHclog(p.client, hclogger) leafWatcher.Stop()
if err != nil { rootsWatcher.Stop()
p.certChan <- &connectCert{err: err}
}
}() }()
go func() { go func() {
err := rootWatcher.RunWithClientAndHclog(p.client, hclogger) errChan <- leafWatcher.RunWithClientAndHclog(p.client, hclogger)
if err != nil { }()
p.certChan <- &connectCert{err: err}
} go func() {
errChan <- rootsWatcher.RunWithClientAndHclog(p.client, hclogger)
}() }()
var ( var (
@ -440,20 +515,28 @@ func (p *Provider) watchConnectTLS(ctx context.Context, leafWatcher *watch.Plan,
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
leafWatcher.Stop() return nil
rootWatcher.Stop()
return case err := <-errChan:
case rootCerts = <-rootChan: return fmt.Errorf("leaf or roots watcher terminated: %w", err)
case rootCerts = <-rootsChan:
case leafCerts = <-leafChan: case leafCerts = <-leafChan:
} }
newCertInfo := &connectCert{ newCertInfo := &connectCert{
root: rootCerts, root: rootCerts,
leaf: leafCerts, leaf: leafCerts,
} }
if newCertInfo.isReady() && !newCertInfo.equals(certInfo) { if newCertInfo.isReady() && !newCertInfo.equals(certInfo) {
logger.Debugf("Updating connect certs for service %s", p.ServiceName) log.FromContext(ctx).Debugf("Updating connect certs for service %s", p.ServiceName)
certInfo = newCertInfo certInfo = newCertInfo
p.certChan <- newCertInfo
select {
case <-ctx.Done():
case p.certChan <- newCertInfo:
}
} }
} }
} }
@ -487,3 +570,25 @@ func createClient(namespace string, endpoint *EndpointConfig) (*api.Client, erro
return api.NewClient(&config) return api.NewClient(&config)
} }
func repeatSend(ctx context.Context, interval time.Duration, c chan<- struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
select {
case <-ctx.Done():
return
case c <- struct{}{}:
default:
// Chan is full, discard event.
}
}
}
}