333 lines
10 KiB
Go
333 lines
10 KiB
Go
|
package docker
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"net"
|
||
|
"strconv"
|
||
|
"time"
|
||
|
|
||
|
"github.com/cenkalti/backoff/v4"
|
||
|
dockertypes "github.com/docker/docker/api/types"
|
||
|
"github.com/docker/docker/api/types/filters"
|
||
|
swarmtypes "github.com/docker/docker/api/types/swarm"
|
||
|
"github.com/docker/docker/api/types/versions"
|
||
|
"github.com/docker/docker/client"
|
||
|
"github.com/rs/zerolog/log"
|
||
|
ptypes "github.com/traefik/paerser/types"
|
||
|
"github.com/traefik/traefik/v3/pkg/config/dynamic"
|
||
|
"github.com/traefik/traefik/v3/pkg/job"
|
||
|
"github.com/traefik/traefik/v3/pkg/logs"
|
||
|
"github.com/traefik/traefik/v3/pkg/provider"
|
||
|
"github.com/traefik/traefik/v3/pkg/safe"
|
||
|
)
|
||
|
|
||
|
// SwarmAPIVersion is a constant holding the version of the Provider API traefik will use.
|
||
|
const SwarmAPIVersion = "1.24"
|
||
|
|
||
|
const swarmName = "swarm"
|
||
|
|
||
|
var _ provider.Provider = (*SwarmProvider)(nil)
|
||
|
|
||
|
// SwarmProvider holds configurations of the provider.
|
||
|
type SwarmProvider struct {
|
||
|
Shared `yaml:",inline" export:"true"`
|
||
|
ClientConfig `yaml:",inline" export:"true"`
|
||
|
|
||
|
RefreshSeconds ptypes.Duration `description:"Polling interval for swarm mode." json:"refreshSeconds,omitempty" toml:"refreshSeconds,omitempty" yaml:"refreshSeconds,omitempty" export:"true"`
|
||
|
}
|
||
|
|
||
|
// SetDefaults sets the default values.
|
||
|
func (p *SwarmProvider) SetDefaults() {
|
||
|
p.Watch = true
|
||
|
p.ExposedByDefault = true
|
||
|
p.Endpoint = "unix:///var/run/docker.sock"
|
||
|
p.RefreshSeconds = ptypes.Duration(15 * time.Second)
|
||
|
p.DefaultRule = DefaultTemplateRule
|
||
|
}
|
||
|
|
||
|
// Init the provider.
|
||
|
func (p *SwarmProvider) Init() error {
|
||
|
defaultRuleTpl, err := provider.MakeDefaultRuleTemplate(p.DefaultRule, nil)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("error while parsing default rule: %w", err)
|
||
|
}
|
||
|
|
||
|
p.defaultRuleTpl = defaultRuleTpl
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (p *SwarmProvider) createClient(ctx context.Context) (*client.Client, error) {
|
||
|
p.ClientConfig.apiVersion = SwarmAPIVersion
|
||
|
return createClient(ctx, p.ClientConfig)
|
||
|
}
|
||
|
|
||
|
// Provide allows the docker provider to provide configurations to traefik using the given configuration channel.
|
||
|
func (p *SwarmProvider) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error {
|
||
|
pool.GoCtx(func(routineCtx context.Context) {
|
||
|
logger := log.Ctx(routineCtx).With().Str(logs.ProviderName, swarmName).Logger()
|
||
|
ctxLog := logger.WithContext(routineCtx)
|
||
|
|
||
|
operation := func() error {
|
||
|
var err error
|
||
|
ctx, cancel := context.WithCancel(ctxLog)
|
||
|
defer cancel()
|
||
|
|
||
|
ctx = log.Ctx(ctx).With().Str(logs.ProviderName, swarmName).Logger().WithContext(ctx)
|
||
|
|
||
|
dockerClient, err := p.createClient(ctx)
|
||
|
if err != nil {
|
||
|
logger.Error().Err(err).Msg("Failed to create Docker API client")
|
||
|
return err
|
||
|
}
|
||
|
defer func() { _ = dockerClient.Close() }()
|
||
|
|
||
|
builder := NewDynConfBuilder(p.Shared, dockerClient)
|
||
|
|
||
|
serverVersion, err := dockerClient.ServerVersion(ctx)
|
||
|
if err != nil {
|
||
|
logger.Error().Err(err).Msg("Failed to retrieve information of the docker client and server host")
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
logger.Debug().Msgf("Provider connection established with docker %s (API %s)", serverVersion.Version, serverVersion.APIVersion)
|
||
|
|
||
|
dockerDataList, err := p.listServices(ctx, dockerClient)
|
||
|
if err != nil {
|
||
|
logger.Error().Err(err).Msg("Failed to list services for docker swarm mode")
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
configuration := builder.build(ctxLog, dockerDataList)
|
||
|
configurationChan <- dynamic.Message{
|
||
|
ProviderName: swarmName,
|
||
|
Configuration: configuration,
|
||
|
}
|
||
|
if p.Watch {
|
||
|
errChan := make(chan error)
|
||
|
|
||
|
// TODO: This need to be change. Linked to Swarm events docker/docker#23827
|
||
|
ticker := time.NewTicker(time.Duration(p.RefreshSeconds))
|
||
|
|
||
|
pool.GoCtx(func(ctx context.Context) {
|
||
|
logger := log.Ctx(ctx).With().Str(logs.ProviderName, swarmName).Logger()
|
||
|
ctx = logger.WithContext(ctx)
|
||
|
|
||
|
defer close(errChan)
|
||
|
for {
|
||
|
select {
|
||
|
case <-ticker.C:
|
||
|
services, err := p.listServices(ctx, dockerClient)
|
||
|
if err != nil {
|
||
|
logger.Error().Err(err).Msg("Failed to list services for docker swarm mode")
|
||
|
errChan <- err
|
||
|
return
|
||
|
}
|
||
|
|
||
|
configuration := builder.build(ctx, services)
|
||
|
if configuration != nil {
|
||
|
configurationChan <- dynamic.Message{
|
||
|
ProviderName: swarmName,
|
||
|
Configuration: configuration,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
case <-ctx.Done():
|
||
|
ticker.Stop()
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
})
|
||
|
if err, ok := <-errChan; ok {
|
||
|
return err
|
||
|
}
|
||
|
// channel closed
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
notify := func(err error, time time.Duration) {
|
||
|
logger.Error().Err(err).Msgf("Provider error, retrying in %s", time)
|
||
|
}
|
||
|
err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctxLog), notify)
|
||
|
if err != nil {
|
||
|
logger.Error().Err(err).Msg("Cannot retrieve data")
|
||
|
}
|
||
|
})
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (p *SwarmProvider) listServices(ctx context.Context, dockerClient client.APIClient) ([]dockerData, error) {
|
||
|
logger := log.Ctx(ctx)
|
||
|
|
||
|
serviceList, err := dockerClient.ServiceList(ctx, dockertypes.ServiceListOptions{})
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
serverVersion, err := dockerClient.ServerVersion(ctx)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
networkListArgs := filters.NewArgs()
|
||
|
// https://docs.docker.com/engine/api/v1.29/#tag/Network (Docker 17.06)
|
||
|
if versions.GreaterThanOrEqualTo(serverVersion.APIVersion, "1.29") {
|
||
|
networkListArgs.Add("scope", "swarm")
|
||
|
} else {
|
||
|
networkListArgs.Add("driver", "overlay")
|
||
|
}
|
||
|
|
||
|
networkList, err := dockerClient.NetworkList(ctx, dockertypes.NetworkListOptions{Filters: networkListArgs})
|
||
|
if err != nil {
|
||
|
logger.Debug().Err(err).Msg("Failed to network inspect on client for docker")
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
networkMap := make(map[string]*dockertypes.NetworkResource)
|
||
|
for _, network := range networkList {
|
||
|
networkToAdd := network
|
||
|
networkMap[network.ID] = &networkToAdd
|
||
|
}
|
||
|
|
||
|
var dockerDataList []dockerData
|
||
|
var dockerDataListTasks []dockerData
|
||
|
|
||
|
for _, service := range serviceList {
|
||
|
dData, err := p.parseService(ctx, service, networkMap)
|
||
|
if err != nil {
|
||
|
logger.Error().Err(err).Msgf("Skip container %s", getServiceName(dData))
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
if dData.ExtraConf.Docker.LBSwarm {
|
||
|
if len(dData.NetworkSettings.Networks) > 0 {
|
||
|
dockerDataList = append(dockerDataList, dData)
|
||
|
}
|
||
|
} else {
|
||
|
isGlobalSvc := service.Spec.Mode.Global != nil
|
||
|
dockerDataListTasks, err = listTasks(ctx, dockerClient, service.ID, dData, networkMap, isGlobalSvc)
|
||
|
if err != nil {
|
||
|
logger.Warn().Err(err).Send()
|
||
|
} else {
|
||
|
dockerDataList = append(dockerDataList, dockerDataListTasks...)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return dockerDataList, err
|
||
|
}
|
||
|
|
||
|
func (p *SwarmProvider) parseService(ctx context.Context, service swarmtypes.Service, networkMap map[string]*dockertypes.NetworkResource) (dockerData, error) {
|
||
|
logger := log.Ctx(ctx)
|
||
|
|
||
|
dData := dockerData{
|
||
|
ID: service.ID,
|
||
|
ServiceName: service.Spec.Annotations.Name,
|
||
|
Name: service.Spec.Annotations.Name,
|
||
|
Labels: service.Spec.Annotations.Labels,
|
||
|
NetworkSettings: networkSettings{},
|
||
|
}
|
||
|
|
||
|
extraConf, err := p.extractLabels(dData)
|
||
|
if err != nil {
|
||
|
return dockerData{}, err
|
||
|
}
|
||
|
dData.ExtraConf = extraConf
|
||
|
|
||
|
if service.Spec.EndpointSpec != nil {
|
||
|
if service.Spec.EndpointSpec.Mode == swarmtypes.ResolutionModeDNSRR {
|
||
|
if dData.ExtraConf.Docker.LBSwarm {
|
||
|
logger.Warn().Msgf("Ignored %s endpoint-mode not supported, service name: %s. Fallback to Traefik load balancing", swarmtypes.ResolutionModeDNSRR, service.Spec.Annotations.Name)
|
||
|
}
|
||
|
} else if service.Spec.EndpointSpec.Mode == swarmtypes.ResolutionModeVIP {
|
||
|
dData.NetworkSettings.Networks = make(map[string]*networkData)
|
||
|
for _, virtualIP := range service.Endpoint.VirtualIPs {
|
||
|
networkService := networkMap[virtualIP.NetworkID]
|
||
|
if networkService != nil {
|
||
|
if len(virtualIP.Addr) > 0 {
|
||
|
ip, _, _ := net.ParseCIDR(virtualIP.Addr)
|
||
|
network := &networkData{
|
||
|
Name: networkService.Name,
|
||
|
ID: virtualIP.NetworkID,
|
||
|
Addr: ip.String(),
|
||
|
}
|
||
|
dData.NetworkSettings.Networks[network.Name] = network
|
||
|
} else {
|
||
|
logger.Debug().Msgf("No virtual IPs found in network %s", virtualIP.NetworkID)
|
||
|
}
|
||
|
} else {
|
||
|
logger.Debug().Msgf("Network not found, id: %s", virtualIP.NetworkID)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return dData, nil
|
||
|
}
|
||
|
|
||
|
func listTasks(ctx context.Context, dockerClient client.APIClient, serviceID string,
|
||
|
serviceDockerData dockerData, networkMap map[string]*dockertypes.NetworkResource, isGlobalSvc bool,
|
||
|
) ([]dockerData, error) {
|
||
|
serviceIDFilter := filters.NewArgs()
|
||
|
serviceIDFilter.Add("service", serviceID)
|
||
|
serviceIDFilter.Add("desired-state", "running")
|
||
|
|
||
|
taskList, err := dockerClient.TaskList(ctx, dockertypes.TaskListOptions{Filters: serviceIDFilter})
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
var dockerDataList []dockerData
|
||
|
for _, task := range taskList {
|
||
|
if task.Status.State != swarmtypes.TaskStateRunning {
|
||
|
continue
|
||
|
}
|
||
|
dData := parseTasks(ctx, task, serviceDockerData, networkMap, isGlobalSvc)
|
||
|
if len(dData.NetworkSettings.Networks) > 0 {
|
||
|
dockerDataList = append(dockerDataList, dData)
|
||
|
}
|
||
|
}
|
||
|
return dockerDataList, err
|
||
|
}
|
||
|
|
||
|
func parseTasks(ctx context.Context, task swarmtypes.Task, serviceDockerData dockerData,
|
||
|
networkMap map[string]*dockertypes.NetworkResource, isGlobalSvc bool,
|
||
|
) dockerData {
|
||
|
dData := dockerData{
|
||
|
ID: task.ID,
|
||
|
ServiceName: serviceDockerData.Name,
|
||
|
Name: serviceDockerData.Name + "." + strconv.Itoa(task.Slot),
|
||
|
Labels: serviceDockerData.Labels,
|
||
|
ExtraConf: serviceDockerData.ExtraConf,
|
||
|
NetworkSettings: networkSettings{},
|
||
|
}
|
||
|
|
||
|
if isGlobalSvc {
|
||
|
dData.Name = serviceDockerData.Name + "." + task.ID
|
||
|
}
|
||
|
|
||
|
if task.NetworksAttachments != nil {
|
||
|
dData.NetworkSettings.Networks = make(map[string]*networkData)
|
||
|
for _, virtualIP := range task.NetworksAttachments {
|
||
|
if networkService, present := networkMap[virtualIP.Network.ID]; present {
|
||
|
if len(virtualIP.Addresses) > 0 {
|
||
|
// Not sure about this next loop - when would a task have multiple IP's for the same network?
|
||
|
for _, addr := range virtualIP.Addresses {
|
||
|
ip, _, _ := net.ParseCIDR(addr)
|
||
|
network := &networkData{
|
||
|
ID: virtualIP.Network.ID,
|
||
|
Name: networkService.Name,
|
||
|
Addr: ip.String(),
|
||
|
}
|
||
|
dData.NetworkSettings.Networks[network.Name] = network
|
||
|
}
|
||
|
} else {
|
||
|
log.Ctx(ctx).Debug().Msgf("No IP addresses found for network %s", virtualIP.Network.ID)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return dData
|
||
|
}
|