2019-10-15 15:34:08 +00:00
package consulcatalog
import (
"context"
"fmt"
"strconv"
"text/template"
"time"
2020-02-26 09:36:05 +00:00
"github.com/cenkalti/backoff/v4"
2019-10-15 15:34:08 +00:00
"github.com/hashicorp/consul/api"
2021-07-15 12:02:11 +00:00
"github.com/hashicorp/consul/api/watch"
"github.com/hashicorp/go-hclog"
"github.com/sirupsen/logrus"
2020-08-17 16:04:03 +00:00
ptypes "github.com/traefik/paerser/types"
2020-09-16 13:46:04 +00:00
"github.com/traefik/traefik/v2/pkg/config/dynamic"
"github.com/traefik/traefik/v2/pkg/job"
"github.com/traefik/traefik/v2/pkg/log"
"github.com/traefik/traefik/v2/pkg/provider"
"github.com/traefik/traefik/v2/pkg/provider/constraints"
"github.com/traefik/traefik/v2/pkg/safe"
"github.com/traefik/traefik/v2/pkg/types"
2019-10-15 15:34:08 +00:00
)
2022-06-03 10:00:09 +00:00
// defaultTemplateRule is the default template for the default rule.
const defaultTemplateRule = "Host(`{{ normalize .Name }}`)"
// providerName is the Consul Catalog provider name.
const providerName = "consulcatalog"
2019-10-15 15:34:08 +00:00
var _ provider . Provider = ( * Provider ) ( nil )
type itemData struct {
2021-07-15 12:02:11 +00:00
ID string
Node string
Datacenter string
Name string
Namespace string
Address string
Port string
Status string
Labels map [ string ] string
Tags [ ] string
ExtraConf configuration
2019-10-15 15:34:08 +00:00
}
2022-06-03 10:00:09 +00:00
// ProviderBuilder is responsible for constructing namespaced instances of the Consul Catalog provider.
type ProviderBuilder struct {
2022-07-12 10:12:08 +00:00
Configuration ` yaml:",inline" export:"true" `
2022-06-03 10:00:09 +00:00
// Deprecated: use Namespaces option instead.
Namespace string ` description:"Sets the namespace used to discover services (Consul Enterprise only)." json:"namespace,omitempty" toml:"namespace,omitempty" yaml:"namespace,omitempty" `
Namespaces [ ] string ` description:"Sets the namespaces used to discover services (Consul Enterprise only)." json:"namespaces,omitempty" toml:"namespaces,omitempty" yaml:"namespaces,omitempty" `
}
// BuildProviders builds Consul Catalog provider instances for the given namespaces configuration.
func ( p * ProviderBuilder ) BuildProviders ( ) [ ] * Provider {
// We can warn about that, because we've already made sure before that
// Namespace and Namespaces are mutually exclusive.
if p . Namespace != "" {
log . WithoutContext ( ) . Warnf ( "Namespace option is deprecated, please use the Namespaces option instead." )
}
if len ( p . Namespaces ) == 0 {
return [ ] * Provider { {
Configuration : p . Configuration ,
name : providerName ,
// p.Namespace could very well be empty.
namespace : p . Namespace ,
} }
}
var providers [ ] * Provider
for _ , namespace := range p . Namespaces {
providers = append ( providers , & Provider {
Configuration : p . Configuration ,
name : providerName + "-" + namespace ,
namespace : namespace ,
} )
}
return providers
}
// Configuration represents the Consul Catalog provider configuration.
type Configuration struct {
2019-10-31 10:56:05 +00:00
Constraints string ` description:"Constraints is an expression that Traefik matches against the container's labels to determine whether to create any route for that container." json:"constraints,omitempty" toml:"constraints,omitempty" yaml:"constraints,omitempty" export:"true" `
Endpoint * EndpointConfig ` description:"Consul endpoint settings" json:"endpoint,omitempty" toml:"endpoint,omitempty" yaml:"endpoint,omitempty" export:"true" `
2022-06-03 10:00:09 +00:00
Prefix string ` description:"Prefix for consul service tags." json:"prefix,omitempty" toml:"prefix,omitempty" yaml:"prefix,omitempty" export:"true" `
RefreshInterval ptypes . Duration ` description:"Interval for check Consul API." json:"refreshInterval,omitempty" toml:"refreshInterval,omitempty" yaml:"refreshInterval,omitempty" export:"true" `
2019-10-31 10:56:05 +00:00
RequireConsistent bool ` description:"Forces the read to be fully consistent." json:"requireConsistent,omitempty" toml:"requireConsistent,omitempty" yaml:"requireConsistent,omitempty" export:"true" `
Stale bool ` description:"Use stale consistency for catalog reads." json:"stale,omitempty" toml:"stale,omitempty" yaml:"stale,omitempty" export:"true" `
Cache bool ` description:"Use local agent caching for catalog reads." json:"cache,omitempty" toml:"cache,omitempty" yaml:"cache,omitempty" export:"true" `
ExposedByDefault bool ` description:"Expose containers by default." json:"exposedByDefault,omitempty" toml:"exposedByDefault,omitempty" yaml:"exposedByDefault,omitempty" export:"true" `
DefaultRule string ` description:"Default rule." json:"defaultRule,omitempty" toml:"defaultRule,omitempty" yaml:"defaultRule,omitempty" `
2021-07-15 12:02:11 +00:00
ConnectAware bool ` description:"Enable Consul Connect support." json:"connectAware,omitempty" toml:"connectAware,omitempty" yaml:"connectAware,omitempty" export:"true" `
ConnectByDefault bool ` description:"Consider every service as Connect capable by default." json:"connectByDefault,omitempty" toml:"connectByDefault,omitempty" yaml:"connectByDefault,omitempty" export:"true" `
ServiceName string ` description:"Name of the Traefik service in Consul Catalog (needs to be registered via the orchestrator or manually)." json:"serviceName,omitempty" toml:"serviceName,omitempty" yaml:"serviceName,omitempty" export:"true" `
2022-01-28 16:16:07 +00:00
Watch bool ` description:"Watch Consul API events." json:"watch,omitempty" toml:"watch,omitempty" yaml:"watch,omitempty" export:"true" `
2022-06-03 10:00:09 +00:00
}
// SetDefaults sets the default values.
func ( c * Configuration ) SetDefaults ( ) {
c . Endpoint = & EndpointConfig { }
c . RefreshInterval = ptypes . Duration ( 15 * time . Second )
c . Prefix = "traefik"
c . ExposedByDefault = true
c . DefaultRule = defaultTemplateRule
c . ServiceName = "traefik"
}
// Provider is the Consul Catalog provider implementation.
type Provider struct {
Configuration
2019-10-15 15:34:08 +00:00
2022-06-03 10:00:09 +00:00
name string
namespace string
2022-01-28 16:16:07 +00:00
client * api . Client
defaultRuleTpl * template . Template
certChan chan * connectCert
watchServicesChan chan struct { }
2019-10-15 15:34:08 +00:00
}
// EndpointConfig holds configurations of the endpoint.
type EndpointConfig struct {
2020-10-30 11:44:05 +00:00
Address string ` description:"The address of the Consul server" json:"address,omitempty" toml:"address,omitempty" yaml:"address,omitempty" `
Scheme string ` description:"The URI scheme for the Consul server" json:"scheme,omitempty" toml:"scheme,omitempty" yaml:"scheme,omitempty" `
DataCenter string ` description:"Data center to use. If not provided, the default agent data center is used" json:"datacenter,omitempty" toml:"datacenter,omitempty" yaml:"datacenter,omitempty" `
2022-01-24 10:08:05 +00:00
Token string ` description:"Token is used to provide a per-request ACL token which overrides the agent's default token" json:"token,omitempty" toml:"token,omitempty" yaml:"token,omitempty" loggable:"false" `
2019-10-15 15:34:08 +00:00
TLS * types . ClientTLS ` description:"Enable TLS support." json:"tls,omitempty" toml:"tls,omitempty" yaml:"tls,omitempty" export:"true" `
HTTPAuth * EndpointHTTPAuthConfig ` description:"Auth info to use for http access" json:"httpAuth,omitempty" toml:"httpAuth,omitempty" yaml:"httpAuth,omitempty" export:"true" `
2020-08-17 16:04:03 +00:00
EndpointWaitTime ptypes . Duration ` description:"WaitTime limits how long a Watch will block. If not provided, the agent default values will be used" json:"endpointWaitTime,omitempty" toml:"endpointWaitTime,omitempty" yaml:"endpointWaitTime,omitempty" export:"true" `
2019-10-15 15:34:08 +00:00
}
// EndpointHTTPAuthConfig holds configurations of the authentication.
type EndpointHTTPAuthConfig struct {
2022-01-24 10:08:05 +00:00
Username string ` description:"Basic Auth username" json:"username,omitempty" toml:"username,omitempty" yaml:"username,omitempty" loggable:"false" `
Password string ` description:"Basic Auth password" json:"password,omitempty" toml:"password,omitempty" yaml:"password,omitempty" loggable:"false" `
2019-10-15 15:34:08 +00:00
}
// Init the provider.
func ( p * Provider ) Init ( ) error {
defaultRuleTpl , err := provider . MakeDefaultRuleTemplate ( p . DefaultRule , nil )
if err != nil {
2020-05-11 10:06:07 +00:00
return fmt . Errorf ( "error while parsing default rule: %w" , err )
2019-10-15 15:34:08 +00:00
}
p . defaultRuleTpl = defaultRuleTpl
2022-01-28 16:16:07 +00:00
p . certChan = make ( chan * connectCert , 1 )
p . watchServicesChan = make ( chan struct { } , 1 )
2022-06-03 10:00:09 +00:00
// In case they didn't initialize Provider with BuildProviders.
if p . name == "" {
p . name = providerName
}
2019-10-15 15:34:08 +00:00
return nil
}
// Provide allows the consul catalog provider to provide configurations to traefik using the given configuration channel.
func ( p * Provider ) Provide ( configurationChan chan <- dynamic . Message , pool * safe . Pool ) error {
2021-07-15 12:02:11 +00:00
var err error
2022-06-03 10:00:09 +00:00
p . client , err = createClient ( p . namespace , p . Endpoint )
2021-07-15 12:02:11 +00:00
if err != nil {
2022-01-28 16:16:07 +00:00
return fmt . Errorf ( "failed to create consul client: %w" , err )
2021-07-15 12:02:11 +00:00
}
2019-10-15 15:34:08 +00:00
pool . GoCtx ( func ( routineCtx context . Context ) {
2022-06-03 10:00:09 +00:00
ctxLog := log . With ( routineCtx , log . Str ( log . ProviderName , p . name ) )
2019-10-15 15:34:08 +00:00
logger := log . FromContext ( ctxLog )
operation := func ( ) error {
2022-01-28 16:16:07 +00:00
ctx , cancel := context . WithCancel ( ctxLog )
// When the operation terminates, we want to clean up the
// goroutines in watchConnectTLS and watchServices.
defer cancel ( )
errChan := make ( chan error , 2 )
if p . ConnectAware {
go func ( ) {
if err := p . watchConnectTLS ( ctx ) ; err != nil {
errChan <- fmt . Errorf ( "failed to watch connect certificates: %w" , err )
}
} ( )
}
var certInfo * connectCert
2019-10-15 15:34:08 +00:00
2021-07-15 12:02:11 +00:00
// If we are running in connect aware mode then we need to
// make sure that we obtain the certificates before starting
// the service watcher, otherwise a connect enabled service
// that gets resolved before the certificates are available
// will cause an error condition.
if p . ConnectAware && ! certInfo . isReady ( ) {
logger . Infof ( "Waiting for Connect certificate before building first configuration" )
select {
2022-01-28 16:16:07 +00:00
case <- ctx . Done ( ) :
2021-07-15 12:02:11 +00:00
return nil
2022-01-28 16:16:07 +00:00
case err = <- errChan :
return err
2021-07-15 12:02:11 +00:00
case certInfo = <- p . certChan :
}
2019-10-15 15:34:08 +00:00
}
2020-11-16 19:44:04 +00:00
// get configuration at the provider's startup.
2022-01-28 16:16:07 +00:00
if err = p . loadConfiguration ( ctx , certInfo , configurationChan ) ; err != nil {
2020-11-16 19:44:04 +00:00
return fmt . Errorf ( "failed to get consul catalog data: %w" , err )
}
2022-01-28 16:16:07 +00:00
go func ( ) {
// Periodic refreshes.
if ! p . Watch {
repeatSend ( ctx , time . Duration ( p . RefreshInterval ) , p . watchServicesChan )
return
}
if err := p . watchServices ( ctx ) ; err != nil {
errChan <- fmt . Errorf ( "failed to watch services: %w" , err )
}
} ( )
2019-10-15 15:34:08 +00:00
for {
select {
2022-01-28 16:16:07 +00:00
case <- ctx . Done ( ) :
2019-10-15 15:34:08 +00:00
return nil
2022-01-28 16:16:07 +00:00
case err = <- errChan :
return err
2021-07-15 12:02:11 +00:00
case certInfo = <- p . certChan :
2022-01-28 16:16:07 +00:00
case <- p . watchServicesChan :
2021-07-15 12:02:11 +00:00
}
2022-01-28 16:16:07 +00:00
if err = p . loadConfiguration ( ctx , certInfo , configurationChan ) ; err != nil {
2021-07-15 12:02:11 +00:00
return fmt . Errorf ( "failed to refresh consul catalog data: %w" , err )
2019-10-15 15:34:08 +00:00
}
}
}
notify := func ( err error , time time . Duration ) {
logger . Errorf ( "Provider connection error %+v, retrying in %s" , err , time )
}
err := backoff . RetryNotify ( safe . OperationWithRecover ( operation ) , backoff . WithContext ( job . NewBackOff ( backoff . NewExponentialBackOff ( ) ) , ctxLog ) , notify )
if err != nil {
2019-10-31 10:56:05 +00:00
logger . Errorf ( "Cannot connect to consul catalog server %+v" , err )
2019-10-15 15:34:08 +00:00
}
} )
return nil
}
2021-07-15 12:02:11 +00:00
func ( p * Provider ) loadConfiguration ( ctx context . Context , certInfo * connectCert , configurationChan chan <- dynamic . Message ) error {
2020-11-16 19:44:04 +00:00
data , err := p . getConsulServicesData ( ctx )
if err != nil {
return err
}
configurationChan <- dynamic . Message {
2022-06-03 10:00:09 +00:00
ProviderName : p . name ,
2021-07-15 12:02:11 +00:00
Configuration : p . buildConfiguration ( ctx , data , certInfo ) ,
2020-11-16 19:44:04 +00:00
}
return nil
}
2019-10-15 15:34:08 +00:00
func ( p * Provider ) getConsulServicesData ( ctx context . Context ) ( [ ] itemData , error ) {
2021-07-15 12:02:11 +00:00
// The query option "Filter" is not supported by /catalog/services.
// https://www.consul.io/api/catalog.html#list-services
2022-01-24 14:30:05 +00:00
opts := & api . QueryOptions { AllowStale : p . Stale , RequireConsistent : p . RequireConsistent , UseCache : p . Cache }
2021-12-03 18:30:07 +00:00
opts = opts . WithContext ( ctx )
2021-07-15 12:02:11 +00:00
serviceNames , _ , err := p . client . Catalog ( ) . Services ( opts )
2019-10-15 15:34:08 +00:00
if err != nil {
return nil , err
}
var data [ ] itemData
2021-07-15 12:02:11 +00:00
for name , tags := range serviceNames {
logger := log . FromContext ( log . With ( ctx , log . Str ( "serviceName" , name ) ) )
2022-08-01 15:52:08 +00:00
extraConf , err := p . getExtraConf ( tagsToNeutralLabels ( tags , p . Prefix ) )
2021-07-15 12:02:11 +00:00
if err != nil {
logger . Errorf ( "Skip service: %v" , err )
continue
}
2022-08-01 15:52:08 +00:00
if ! extraConf . Enable {
2021-07-15 12:02:11 +00:00
logger . Debug ( "Filtering disabled item" )
continue
}
matches , err := constraints . MatchTags ( tags , p . Constraints )
if err != nil {
logger . Errorf ( "Error matching constraint expressions: %v" , err )
continue
}
if ! matches {
logger . Debugf ( "Container pruned by constraint expressions: %q" , p . Constraints )
continue
}
2022-08-01 15:52:08 +00:00
if ! p . ConnectAware && extraConf . ConsulCatalog . Connect {
2021-07-15 12:02:11 +00:00
logger . Debugf ( "Filtering out Connect aware item, Connect support is not enabled" )
continue
}
2022-08-01 15:52:08 +00:00
consulServices , statuses , err := p . fetchService ( ctx , name , extraConf . ConsulCatalog . Connect )
2019-10-15 15:34:08 +00:00
if err != nil {
return nil , err
}
2020-11-17 16:30:03 +00:00
for _ , consulService := range consulServices {
2022-06-29 10:04:09 +00:00
address := consulService . Service . Address
2019-11-14 10:10:06 +00:00
if address == "" {
2022-06-29 10:04:09 +00:00
address = consulService . Node . Address
2019-11-14 10:10:06 +00:00
}
2022-06-29 10:04:09 +00:00
namespace := consulService . Service . Namespace
2021-07-15 12:02:11 +00:00
if namespace == "" {
namespace = "default"
}
2022-06-29 10:04:09 +00:00
status , exists := statuses [ consulService . Node . ID + consulService . Service . ID ]
2020-11-17 16:30:03 +00:00
if ! exists {
status = api . HealthAny
}
2019-10-15 15:34:08 +00:00
item := itemData {
2022-06-29 10:04:09 +00:00
ID : consulService . Service . ID ,
Node : consulService . Node . Node ,
Datacenter : consulService . Node . Datacenter ,
2021-07-15 12:02:11 +00:00
Namespace : namespace ,
Name : name ,
Address : address ,
2022-06-29 10:04:09 +00:00
Port : strconv . Itoa ( consulService . Service . Port ) ,
Labels : tagsToNeutralLabels ( consulService . Service . Tags , p . Prefix ) ,
Tags : consulService . Service . Tags ,
2021-07-15 12:02:11 +00:00
Status : status ,
2019-10-15 15:34:08 +00:00
}
2022-08-01 15:52:08 +00:00
extraConf , err := p . getExtraConf ( item . Labels )
2019-10-15 15:34:08 +00:00
if err != nil {
log . FromContext ( ctx ) . Errorf ( "Skip item %s: %v" , item . Name , err )
continue
}
item . ExtraConf = extraConf
data = append ( data , item )
}
}
2021-07-15 12:02:11 +00:00
2019-10-15 15:34:08 +00:00
return data , nil
}
2022-06-29 10:04:09 +00:00
func ( p * Provider ) fetchService ( ctx context . Context , name string , connectEnabled bool ) ( [ ] * api . ServiceEntry , map [ string ] string , error ) {
2019-10-15 15:34:08 +00:00
var tagFilter string
if ! p . ExposedByDefault {
tagFilter = p . Prefix + ".enable=true"
}
2022-01-24 14:30:05 +00:00
opts := & api . QueryOptions { AllowStale : p . Stale , RequireConsistent : p . RequireConsistent , UseCache : p . Cache }
2020-11-17 16:30:03 +00:00
opts = opts . WithContext ( ctx )
2019-12-19 10:00:07 +00:00
2021-07-15 12:02:11 +00:00
healthFunc := p . client . Health ( ) . Service
if connectEnabled {
healthFunc = p . client . Health ( ) . Connect
}
2022-06-29 10:04:09 +00:00
consulServices , _ , err := healthFunc ( name , tagFilter , false , opts )
2020-11-17 16:30:03 +00:00
if err != nil {
return nil , nil , err
}
// Index status by service and node so it can be retrieved from a CatalogService even if the health and services
// are not in sync.
statuses := make ( map [ string ] string )
2022-06-29 10:04:09 +00:00
for _ , health := range consulServices {
2020-11-17 16:30:03 +00:00
if health . Service == nil || health . Node == nil {
continue
}
statuses [ health . Node . ID + health . Service . ID ] = health . Checks . AggregatedStatus ( )
}
return consulServices , statuses , err
2019-10-15 15:34:08 +00:00
}
2022-01-28 16:16:07 +00:00
// watchServices watches for update events of the services list and statuses,
// and transmits them to the caller through the p.watchServicesChan.
func ( p * Provider ) watchServices ( ctx context . Context ) error {
servicesWatcher , err := watch . Parse ( map [ string ] interface { } { "type" : "services" } )
if err != nil {
return fmt . Errorf ( "failed to create services watcher plan: %w" , err )
}
servicesWatcher . HybridHandler = func ( _ watch . BlockingParamVal , _ interface { } ) {
select {
case <- ctx . Done ( ) :
case p . watchServicesChan <- struct { } { } :
default :
// Event chan is full, discard event.
}
}
checksWatcher , err := watch . Parse ( map [ string ] interface { } { "type" : "checks" } )
if err != nil {
return fmt . Errorf ( "failed to create checks watcher plan: %w" , err )
}
checksWatcher . HybridHandler = func ( _ watch . BlockingParamVal , _ interface { } ) {
select {
case <- ctx . Done ( ) :
case p . watchServicesChan <- struct { } { } :
default :
// Event chan is full, discard event.
}
}
logger := hclog . New ( & hclog . LoggerOptions {
Name : "consulcatalog" ,
Level : hclog . LevelFromString ( logrus . GetLevel ( ) . String ( ) ) ,
JSONFormat : true ,
} )
errChan := make ( chan error , 2 )
defer func ( ) {
servicesWatcher . Stop ( )
checksWatcher . Stop ( )
} ( )
go func ( ) {
errChan <- servicesWatcher . RunWithClientAndHclog ( p . client , logger )
} ( )
go func ( ) {
errChan <- checksWatcher . RunWithClientAndHclog ( p . client , logger )
} ( )
select {
case <- ctx . Done ( ) :
return nil
case err = <- errChan :
return fmt . Errorf ( "services or checks watcher terminated: %w" , err )
}
}
2021-07-15 12:02:11 +00:00
func rootsWatchHandler ( ctx context . Context , dest chan <- [ ] string ) func ( watch . BlockingParamVal , interface { } ) {
return func ( _ watch . BlockingParamVal , raw interface { } ) {
if raw == nil {
log . FromContext ( ctx ) . Errorf ( "Root certificate watcher called with nil" )
return
}
2020-02-13 09:26:04 +00:00
2021-07-15 12:02:11 +00:00
v , ok := raw . ( * api . CARootList )
if ! ok || v == nil {
log . FromContext ( ctx ) . Errorf ( "Invalid result for root certificate watcher" )
return
}
2020-02-13 09:26:04 +00:00
2021-07-15 12:02:11 +00:00
roots := make ( [ ] string , 0 , len ( v . Roots ) )
for _ , root := range v . Roots {
roots = append ( roots , root . RootCertPEM )
2020-02-13 09:26:04 +00:00
}
2022-01-28 16:16:07 +00:00
select {
case <- ctx . Done ( ) :
case dest <- roots :
}
2021-07-15 12:02:11 +00:00
}
}
type keyPair struct {
cert string
key string
}
func leafWatcherHandler ( ctx context . Context , dest chan <- keyPair ) func ( watch . BlockingParamVal , interface { } ) {
return func ( _ watch . BlockingParamVal , raw interface { } ) {
if raw == nil {
log . FromContext ( ctx ) . Errorf ( "Leaf certificate watcher called with nil" )
return
2020-02-13 09:26:04 +00:00
}
2021-07-15 12:02:11 +00:00
v , ok := raw . ( * api . LeafCert )
if ! ok || v == nil {
log . FromContext ( ctx ) . Errorf ( "Invalid result for leaf certificate watcher" )
return
2020-02-13 09:26:04 +00:00
}
2022-01-28 16:16:07 +00:00
kp := keyPair {
2021-07-15 12:02:11 +00:00
cert : v . CertPEM ,
key : v . PrivateKeyPEM ,
2020-02-13 09:26:04 +00:00
}
2022-01-28 16:16:07 +00:00
select {
case <- ctx . Done ( ) :
case dest <- kp :
}
2021-07-15 12:02:11 +00:00
}
}
2022-01-28 16:16:07 +00:00
// watchConnectTLS watches for updates of the root certificate or the leaf
// certificate, and transmits them to the caller via p.certChan.
func ( p * Provider ) watchConnectTLS ( ctx context . Context ) error {
leafChan := make ( chan keyPair )
2021-07-15 12:02:11 +00:00
leafWatcher , err := watch . Parse ( map [ string ] interface { } {
"type" : "connect_leaf" ,
"service" : p . ServiceName ,
} )
if err != nil {
2022-01-28 16:16:07 +00:00
return fmt . Errorf ( "failed to create leaf cert watcher plan: %w" , err )
2021-07-15 12:02:11 +00:00
}
2022-01-28 16:16:07 +00:00
leafWatcher . HybridHandler = leafWatcherHandler ( ctx , leafChan )
2020-02-13 09:26:04 +00:00
2022-01-28 16:16:07 +00:00
rootsChan := make ( chan [ ] string )
rootsWatcher , err := watch . Parse ( map [ string ] interface { } {
2021-07-15 12:02:11 +00:00
"type" : "connect_roots" ,
} )
if err != nil {
2022-01-28 16:16:07 +00:00
return fmt . Errorf ( "failed to create roots cert watcher plan: %w" , err )
2020-02-13 09:26:04 +00:00
}
2022-01-28 16:16:07 +00:00
rootsWatcher . HybridHandler = rootsWatchHandler ( ctx , rootsChan )
2020-02-13 09:26:04 +00:00
2022-01-28 16:16:07 +00:00
hclogger := hclog . New ( & hclog . LoggerOptions {
2021-07-15 12:02:11 +00:00
Name : "consulcatalog" ,
Level : hclog . LevelFromString ( logrus . GetLevel ( ) . String ( ) ) ,
JSONFormat : true ,
2022-01-28 16:16:07 +00:00
} )
2021-07-15 12:02:11 +00:00
2022-01-28 16:16:07 +00:00
errChan := make ( chan error , 2 )
defer func ( ) {
leafWatcher . Stop ( )
rootsWatcher . Stop ( )
} ( )
2021-07-15 12:02:11 +00:00
go func ( ) {
2022-01-28 16:16:07 +00:00
errChan <- leafWatcher . RunWithClientAndHclog ( p . client , hclogger )
2021-07-15 12:02:11 +00:00
} ( )
go func ( ) {
2022-01-28 16:16:07 +00:00
errChan <- rootsWatcher . RunWithClientAndHclog ( p . client , hclogger )
2021-07-15 12:02:11 +00:00
} ( )
var (
certInfo * connectCert
leafCerts keyPair
rootCerts [ ] string
)
for {
select {
case <- ctx . Done ( ) :
2022-01-28 16:16:07 +00:00
return nil
case err := <- errChan :
return fmt . Errorf ( "leaf or roots watcher terminated: %w" , err )
case rootCerts = <- rootsChan :
2021-07-15 12:02:11 +00:00
case leafCerts = <- leafChan :
}
2022-01-28 16:16:07 +00:00
2021-07-15 12:02:11 +00:00
newCertInfo := & connectCert {
root : rootCerts ,
leaf : leafCerts ,
}
if newCertInfo . isReady ( ) && ! newCertInfo . equals ( certInfo ) {
2022-01-28 16:16:07 +00:00
log . FromContext ( ctx ) . Debugf ( "Updating connect certs for service %s" , p . ServiceName )
2021-07-15 12:02:11 +00:00
certInfo = newCertInfo
2022-01-28 16:16:07 +00:00
select {
case <- ctx . Done ( ) :
case p . certChan <- newCertInfo :
}
2020-02-13 09:26:04 +00:00
}
}
2019-10-15 15:34:08 +00:00
}
2022-01-24 14:30:05 +00:00
func createClient ( namespace string , endpoint * EndpointConfig ) ( * api . Client , error ) {
2019-10-15 15:34:08 +00:00
config := api . Config {
2022-01-24 14:30:05 +00:00
Address : endpoint . Address ,
Scheme : endpoint . Scheme ,
Datacenter : endpoint . DataCenter ,
WaitTime : time . Duration ( endpoint . EndpointWaitTime ) ,
Token : endpoint . Token ,
Namespace : namespace ,
2019-10-15 15:34:08 +00:00
}
2022-01-24 14:30:05 +00:00
if endpoint . HTTPAuth != nil {
2019-10-15 15:34:08 +00:00
config . HttpAuth = & api . HttpBasicAuth {
2022-01-24 14:30:05 +00:00
Username : endpoint . HTTPAuth . Username ,
Password : endpoint . HTTPAuth . Password ,
2019-10-15 15:34:08 +00:00
}
}
2022-01-24 14:30:05 +00:00
if endpoint . TLS != nil {
2019-10-15 15:34:08 +00:00
config . TLSConfig = api . TLSConfig {
2022-01-24 14:30:05 +00:00
Address : endpoint . Address ,
CAFile : endpoint . TLS . CA ,
CertFile : endpoint . TLS . Cert ,
KeyFile : endpoint . TLS . Key ,
InsecureSkipVerify : endpoint . TLS . InsecureSkipVerify ,
2019-10-15 15:34:08 +00:00
}
}
return api . NewClient ( & config )
}
2022-01-28 16:16:07 +00:00
func repeatSend ( ctx context . Context , interval time . Duration , c chan <- struct { } ) {
ticker := time . NewTicker ( interval )
defer ticker . Stop ( )
for {
select {
case <- ctx . Done ( ) :
return
case <- ticker . C :
select {
case <- ctx . Done ( ) :
return
case c <- struct { } { } :
default :
// Chan is full, discard event.
}
}
}
}