traefik/pkg/provider/rancher/rancher.go
2022-11-30 09:50:05 +01:00

232 lines
7.8 KiB
Go

package rancher
import (
"context"
"fmt"
"text/template"
"time"
"github.com/cenkalti/backoff/v4"
rancher "github.com/rancher/go-rancher-metadata/metadata"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v2/pkg/config/dynamic"
"github.com/traefik/traefik/v2/pkg/job"
"github.com/traefik/traefik/v2/pkg/logs"
"github.com/traefik/traefik/v2/pkg/provider"
"github.com/traefik/traefik/v2/pkg/safe"
)
const (
// DefaultTemplateRule The default template for the default rule.
DefaultTemplateRule = "Host(`{{ normalize .Name }}`)"
)
// Health.
const (
healthy = "healthy"
updatingHealthy = "updating-healthy"
)
// States.
const (
active = "active"
running = "running"
upgraded = "upgraded"
upgrading = "upgrading"
updatingActive = "updating-active"
updatingRunning = "updating-running"
)
var _ provider.Provider = (*Provider)(nil)
// Provider holds configurations of the provider.
type Provider struct {
Constraints string `description:"Constraints is an expression that Traefik matches against the container's labels to determine whether to create any route for that container." json:"constraints,omitempty" toml:"constraints,omitempty" yaml:"constraints,omitempty" export:"true"`
Watch bool `description:"Watch provider." json:"watch,omitempty" toml:"watch,omitempty" yaml:"watch,omitempty" export:"true"`
DefaultRule string `description:"Default rule." json:"defaultRule,omitempty" toml:"defaultRule,omitempty" yaml:"defaultRule,omitempty"`
ExposedByDefault bool `description:"Expose containers by default." json:"exposedByDefault,omitempty" toml:"exposedByDefault,omitempty" yaml:"exposedByDefault,omitempty" export:"true"`
EnableServiceHealthFilter bool `description:"Filter services with unhealthy states and inactive states." json:"enableServiceHealthFilter,omitempty" toml:"enableServiceHealthFilter,omitempty" yaml:"enableServiceHealthFilter,omitempty" export:"true"`
RefreshSeconds int `description:"Defines the polling interval in seconds." json:"refreshSeconds,omitempty" toml:"refreshSeconds,omitempty" yaml:"refreshSeconds,omitempty" export:"true"`
IntervalPoll bool `description:"Poll the Rancher metadata service every 'rancher.refreshseconds' (less accurate)." json:"intervalPoll,omitempty" toml:"intervalPoll,omitempty" yaml:"intervalPoll,omitempty" export:"true"`
Prefix string `description:"Prefix used for accessing the Rancher metadata service." json:"prefix,omitempty" toml:"prefix,omitempty" yaml:"prefix,omitempty"`
defaultRuleTpl *template.Template
}
// SetDefaults sets the default values.
func (p *Provider) SetDefaults() {
p.Watch = true
p.ExposedByDefault = true
p.EnableServiceHealthFilter = true
p.RefreshSeconds = 15
p.DefaultRule = DefaultTemplateRule
p.Prefix = "latest"
}
type rancherData struct {
Name string
Labels map[string]string
Containers []string
Health string
State string
Port string
ExtraConf configuration
}
// Init the provider.
func (p *Provider) Init() error {
defaultRuleTpl, err := provider.MakeDefaultRuleTemplate(p.DefaultRule, nil)
if err != nil {
return fmt.Errorf("error while parsing default rule: %w", err)
}
p.defaultRuleTpl = defaultRuleTpl
return nil
}
func (p *Provider) createClient(ctx context.Context) (rancher.Client, error) {
metadataServiceURL := fmt.Sprintf("http://rancher-metadata.rancher.internal/%s", p.Prefix)
client, err := rancher.NewClientAndWait(metadataServiceURL)
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("Failed to create Rancher metadata service client")
return nil, err
}
return client, nil
}
// Provide allows the rancher provider to provide configurations to traefik using the given configuration channel.
func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error {
pool.GoCtx(func(routineCtx context.Context) {
logger := log.Ctx(routineCtx).With().Str(logs.ProviderName, "rancher").Logger()
ctxLog := logger.WithContext(routineCtx)
operation := func() error {
client, err := p.createClient(ctxLog)
if err != nil {
logger.Error().Err(err).Msg("Failed to create the metadata client metadata service")
return err
}
updateConfiguration := func(_ string) {
stacks, err := client.GetStacks()
if err != nil {
logger.Error().Err(err).Msg("Failed to query Rancher metadata service")
return
}
rancherData := p.parseMetadataSourcedRancherData(ctxLog, stacks)
logger.Printf("Received Rancher data %+v", rancherData)
configuration := p.buildConfiguration(ctxLog, rancherData)
configurationChan <- dynamic.Message{
ProviderName: "rancher",
Configuration: configuration,
}
}
updateConfiguration("init")
if p.Watch {
if p.IntervalPoll {
p.intervalPoll(ctxLog, client, updateConfiguration)
} else {
// Long polling should be favored for the most accurate configuration updates.
// Holds the connection until there is either a change in the metadata repository or `p.RefreshSeconds` has elapsed.
client.OnChangeCtx(ctxLog, p.RefreshSeconds, updateConfiguration)
}
}
return nil
}
notify := func(err error, time time.Duration) {
logger.Error().Err(err).Msgf("Provider error, retrying in %s", time)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctxLog), notify)
if err != nil {
logger.Error().Err(err).Msg("Cannot retrieve data")
}
})
return nil
}
func (p *Provider) intervalPoll(ctx context.Context, client rancher.Client, updateConfiguration func(string)) {
ticker := time.NewTicker(time.Duration(p.RefreshSeconds) * time.Second)
defer ticker.Stop()
var version string
for {
select {
case <-ticker.C:
newVersion, err := client.GetVersion()
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("Failed to create Rancher metadata service client")
} else if version != newVersion {
version = newVersion
updateConfiguration(version)
}
case <-ctx.Done():
return
}
}
}
func (p *Provider) parseMetadataSourcedRancherData(ctx context.Context, stacks []rancher.Stack) (rancherDataList []rancherData) {
for _, stack := range stacks {
for _, service := range stack.Services {
logger := log.Ctx(ctx).With().Str("stack", stack.Name).Str("service", service.Name).Logger()
ctxSvc := logger.WithContext(ctx)
servicePort := ""
if len(service.Ports) > 0 {
servicePort = service.Ports[0]
}
for _, port := range service.Ports {
logger.Debug().Msgf("Set Port %s", port)
}
var containerIPAddresses []string
for _, container := range service.Containers {
if containerFilter(ctxSvc, container.Name, container.HealthState, container.State) {
containerIPAddresses = append(containerIPAddresses, container.PrimaryIp)
}
}
service := rancherData{
Name: service.Name + "_" + stack.Name,
State: service.State,
Labels: service.Labels,
Port: servicePort,
Containers: containerIPAddresses,
}
extraConf, err := p.getConfiguration(service)
if err != nil {
logger.Error().Err(err).Msgf("Skip container %s", service.Name)
continue
}
service.ExtraConf = extraConf
rancherDataList = append(rancherDataList, service)
}
}
return rancherDataList
}
func containerFilter(ctx context.Context, name, healthState, state string) bool {
logger := log.Ctx(ctx)
if healthState != "" && healthState != healthy && healthState != updatingHealthy {
logger.Debug().Msgf("Filtering container %s with healthState of %s", name, healthState)
return false
}
if state != "" && state != running && state != updatingRunning && state != upgraded {
logger.Debug().Msgf("Filtering container %s with state of %s", name, state)
return false
}
return true
}