traefik/old/provider/ecs/ecs.go

443 lines
12 KiB
Go
Raw Normal View History

package ecs
2017-01-05 14:24:17 +00:00
import (
"context"
"fmt"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/defaults"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ecs"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
2018-11-14 09:18:03 +00:00
"github.com/containous/traefik/old/log"
"github.com/containous/traefik/old/provider"
"github.com/containous/traefik/old/types"
2017-01-05 14:24:17 +00:00
"github.com/containous/traefik/safe"
"github.com/patrickmn/go-cache"
2017-01-05 14:24:17 +00:00
)
var _ provider.Provider = (*Provider)(nil)
var existingTaskDefCache = cache.New(30*time.Minute, 5*time.Minute)
2017-01-05 14:24:17 +00:00
// Provider holds configurations of the provider.
type Provider struct {
2017-10-02 08:32:02 +00:00
provider.BaseProvider `mapstructure:",squash" export:"true"`
2017-01-05 14:24:17 +00:00
Domain string `description:"Default domain used"`
2017-10-02 08:32:02 +00:00
ExposedByDefault bool `description:"Expose containers by default" export:"true"`
RefreshSeconds int `description:"Polling interval (in seconds)" export:"true"`
2017-01-05 14:24:17 +00:00
// Provider lookup parameters
2017-08-22 09:46:03 +00:00
Clusters Clusters `description:"ECS Clusters name"`
2017-10-02 08:32:02 +00:00
AutoDiscoverClusters bool `description:"Auto discover cluster" export:"true"`
Region string `description:"The AWS region to use for requests" export:"true"`
2017-08-22 09:46:03 +00:00
AccessKeyID string `description:"The AWS credentials access key to use for making requests"`
SecretAccessKey string `description:"The AWS credentials access key to use for making requests"`
2017-01-05 14:24:17 +00:00
}
type ecsInstance struct {
Name string
ID string
containerDefinition *ecs.ContainerDefinition
2018-05-28 16:52:03 +00:00
machine *machine
2018-03-28 00:13:48 +00:00
TraefikLabels map[string]string
2018-08-27 14:32:05 +00:00
SegmentLabels map[string]string
SegmentName string
2017-01-05 14:24:17 +00:00
}
2018-07-04 13:08:03 +00:00
type portMapping struct {
containerPort int64
hostPort int64
}
2018-05-28 16:52:03 +00:00
type machine struct {
state string
privateIP string
2018-07-04 13:08:03 +00:00
ports []portMapping
2018-05-28 16:52:03 +00:00
}
2017-01-05 14:24:17 +00:00
type awsClient struct {
ecs *ecs.ECS
ec2 *ec2.EC2
}
// Init the provider
func (p *Provider) Init(constraints types.Constraints) error {
return p.BaseProvider.Init(constraints)
}
func (p *Provider) createClient() (*awsClient, error) {
2018-02-22 13:58:04 +00:00
sess, err := session.NewSession()
if err != nil {
return nil, err
}
2017-01-05 14:24:17 +00:00
ec2meta := ec2metadata.New(sess)
if p.Region == "" {
2017-01-05 14:24:17 +00:00
log.Infoln("No EC2 region provided, querying instance metadata endpoint...")
identity, err := ec2meta.GetInstanceIdentityDocument()
if err != nil {
return nil, err
}
p.Region = identity.Region
2017-01-05 14:24:17 +00:00
}
cfg := &aws.Config{
Region: &p.Region,
2017-01-05 14:24:17 +00:00
Credentials: credentials.NewChainCredentials(
[]credentials.Provider{
&credentials.StaticProvider{
Value: credentials.Value{
AccessKeyID: p.AccessKeyID,
SecretAccessKey: p.SecretAccessKey,
2017-01-05 14:24:17 +00:00
},
},
&credentials.EnvProvider{},
&credentials.SharedCredentialsProvider{},
defaults.RemoteCredProvider(*(defaults.Config()), defaults.Handlers()),
}),
}
2017-07-07 21:48:53 +00:00
if p.Trace {
cfg.WithLogger(aws.LoggerFunc(func(args ...interface{}) {
log.Debug(args...)
}))
}
2017-01-05 14:24:17 +00:00
return &awsClient{
2018-08-01 14:58:03 +00:00
ecs: ecs.New(sess, cfg),
ec2: ec2.New(sess, cfg),
2017-01-05 14:24:17 +00:00
}, nil
}
// Provide allows the ecs provider to provide configurations to traefik
2017-01-05 14:24:17 +00:00
// using the given configuration channel.
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
2017-01-05 14:24:17 +00:00
handleCanceled := func(ctx context.Context, err error) error {
if ctx.Err() == context.Canceled || err == context.Canceled {
return nil
}
return err
}
pool.Go(func(stop chan bool) {
ctx, cancel := context.WithCancel(context.Background())
safe.Go(func() {
2017-12-06 09:52:03 +00:00
<-stop
cancel()
})
2017-01-05 14:24:17 +00:00
operation := func() error {
2017-10-10 09:10:02 +00:00
awsClient, err := p.createClient()
2017-01-05 14:24:17 +00:00
if err != nil {
return err
}
2017-10-10 09:10:02 +00:00
configuration, err := p.loadECSConfig(ctx, awsClient)
2017-01-05 14:24:17 +00:00
if err != nil {
return handleCanceled(ctx, err)
}
configurationChan <- types.ConfigMessage{
ProviderName: "ecs",
Configuration: configuration,
}
if p.Watch {
reload := time.NewTicker(time.Second * time.Duration(p.RefreshSeconds))
2017-01-05 14:24:17 +00:00
defer reload.Stop()
for {
select {
case <-reload.C:
2017-10-10 09:10:02 +00:00
configuration, err := p.loadECSConfig(ctx, awsClient)
2017-01-05 14:24:17 +00:00
if err != nil {
return handleCanceled(ctx, err)
}
configurationChan <- types.ConfigMessage{
ProviderName: "ecs",
Configuration: configuration,
}
case <-ctx.Done():
return handleCanceled(ctx, ctx.Err())
}
}
}
return nil
}
notify := func(err error, time time.Duration) {
log.Errorf("Provider connection error %+v, retrying in %s", err, time)
2017-01-05 14:24:17 +00:00
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Cannot connect to Provider api %+v", err)
2017-01-05 14:24:17 +00:00
}
})
return nil
}
// Find all running Provider tasks in a cluster, also collect the task definitions (for docker labels)
2017-01-05 14:24:17 +00:00
// and the EC2 instance data
func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsInstance, error) {
2017-08-22 09:46:03 +00:00
var clustersArn []*string
var clusters Clusters
2017-01-05 14:24:17 +00:00
2017-08-22 09:46:03 +00:00
if p.AutoDiscoverClusters {
input := &ecs.ListClustersInput{}
for {
result, err := client.ecs.ListClusters(input)
if err != nil {
return nil, err
}
if result != nil {
clustersArn = append(clustersArn, result.ClusterArns...)
input.NextToken = result.NextToken
if result.NextToken == nil {
break
}
} else {
break
}
2017-01-05 14:24:17 +00:00
}
2018-02-22 13:58:04 +00:00
for _, cArn := range clustersArn {
clusters = append(clusters, *cArn)
2017-08-22 09:46:03 +00:00
}
} else {
clusters = p.Clusters
2017-01-05 14:24:17 +00:00
}
2018-03-28 00:13:48 +00:00
var instances []ecsInstance
2017-08-22 09:46:03 +00:00
log.Debugf("ECS Clusters: %s", clusters)
2018-03-28 00:13:48 +00:00
2017-08-22 09:46:03 +00:00
for _, c := range clusters {
2017-01-05 14:24:17 +00:00
2018-05-28 16:52:03 +00:00
input := &ecs.ListTasksInput{
2017-08-22 09:46:03 +00:00
Cluster: &c,
DesiredStatus: aws.String(ecs.DesiredStatusRunning),
2018-05-28 16:52:03 +00:00
}
tasks := make(map[string]*ecs.Task)
err := client.ecs.ListTasksPagesWithContext(ctx, input, func(page *ecs.ListTasksOutput, lastPage bool) bool {
if len(page.TaskArns) > 0 {
resp, err := client.ecs.DescribeTasksWithContext(ctx, &ecs.DescribeTasksInput{
Tasks: page.TaskArns,
Cluster: &c,
})
if err != nil {
2018-07-03 08:02:03 +00:00
log.Errorf("Unable to describe tasks for %v", page.TaskArns)
2018-05-28 16:52:03 +00:00
} else {
for _, t := range resp.Tasks {
if aws.StringValue(t.LastStatus) == ecs.DesiredStatusRunning {
tasks[aws.StringValue(t.TaskArn)] = t
}
2018-05-28 16:52:03 +00:00
}
}
2017-08-22 09:46:03 +00:00
}
2018-05-28 16:52:03 +00:00
return !lastPage
})
2018-05-28 16:52:03 +00:00
if err != nil {
log.Error("Unable to list tasks")
return nil, err
2017-08-22 09:46:03 +00:00
}
2017-09-29 14:56:03 +00:00
// Skip to the next cluster if there are no tasks found on
// this cluster.
2018-05-28 16:52:03 +00:00
if len(tasks) == 0 {
2017-09-29 14:56:03 +00:00
continue
}
2017-01-05 14:24:17 +00:00
2018-05-28 16:52:03 +00:00
ec2Instances, err := p.lookupEc2Instances(ctx, client, &c, tasks)
2017-08-22 09:46:03 +00:00
if err != nil {
return nil, err
}
2017-01-05 14:24:17 +00:00
2018-05-28 16:52:03 +00:00
taskDefinitions, err := p.lookupTaskDefinitions(ctx, client, tasks)
2017-08-22 09:46:03 +00:00
if err != nil {
return nil, err
}
2017-01-05 14:24:17 +00:00
2018-05-28 16:52:03 +00:00
for key, task := range tasks {
2017-01-05 14:24:17 +00:00
2018-05-28 16:52:03 +00:00
containerInstance := ec2Instances[aws.StringValue(task.ContainerInstanceArn)]
taskDef := taskDefinitions[key]
2017-01-05 14:24:17 +00:00
2017-08-22 09:46:03 +00:00
for _, container := range task.Containers {
2017-01-05 14:24:17 +00:00
2017-08-22 09:46:03 +00:00
var containerDefinition *ecs.ContainerDefinition
2018-05-28 16:52:03 +00:00
for _, def := range taskDef.ContainerDefinitions {
2018-03-09 11:02:29 +00:00
if aws.StringValue(container.Name) == aws.StringValue(def.Name) {
2017-08-22 09:46:03 +00:00
containerDefinition = def
break
}
2017-01-05 14:24:17 +00:00
}
2018-05-28 16:52:03 +00:00
if containerDefinition == nil {
log.Debugf("Unable to find container definition for %s", aws.StringValue(container.Name))
continue
}
var mach *machine
if len(task.Attachments) != 0 {
2018-07-04 13:08:03 +00:00
var ports []portMapping
for _, mapping := range containerDefinition.PortMappings {
if mapping != nil {
ports = append(ports, portMapping{
hostPort: aws.Int64Value(mapping.HostPort),
containerPort: aws.Int64Value(mapping.ContainerPort),
})
}
2018-05-28 16:52:03 +00:00
}
mach = &machine{
privateIP: aws.StringValue(container.NetworkInterfaces[0].PrivateIpv4Address),
2018-07-04 13:08:03 +00:00
ports: ports,
2018-05-28 16:52:03 +00:00
state: aws.StringValue(task.LastStatus),
}
} else {
2018-07-04 13:08:03 +00:00
var ports []portMapping
for _, mapping := range container.NetworkBindings {
if mapping != nil {
ports = append(ports, portMapping{
hostPort: aws.Int64Value(mapping.HostPort),
containerPort: aws.Int64Value(mapping.ContainerPort),
})
}
2018-05-28 16:52:03 +00:00
}
mach = &machine{
privateIP: aws.StringValue(containerInstance.PrivateIpAddress),
2018-07-04 13:08:03 +00:00
ports: ports,
2018-05-28 16:52:03 +00:00
state: aws.StringValue(containerInstance.State.Name),
}
}
2017-08-22 09:46:03 +00:00
instances = append(instances, ecsInstance{
2018-03-28 00:13:48 +00:00
Name: fmt.Sprintf("%s-%s", strings.Replace(aws.StringValue(task.Group), ":", "-", 1), *container.Name),
2018-05-28 16:52:03 +00:00
ID: key[len(key)-12:],
2018-03-28 00:13:48 +00:00
containerDefinition: containerDefinition,
2018-05-28 16:52:03 +00:00
machine: mach,
2018-03-28 00:13:48 +00:00
TraefikLabels: aws.StringValueMap(containerDefinition.DockerLabels),
2017-08-22 09:46:03 +00:00
})
}
2017-01-05 14:24:17 +00:00
}
}
return instances, nil
}
2018-05-28 16:52:03 +00:00
func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, clusterName *string, ecsDatas map[string]*ecs.Task) (map[string]*ec2.Instance, error) {
instanceIds := make(map[string]string)
ec2Instances := make(map[string]*ec2.Instance)
2017-01-05 14:24:17 +00:00
2018-05-28 16:52:03 +00:00
var containerInstancesArns []*string
var instanceArns []*string
for _, task := range ecsDatas {
if task.ContainerInstanceArn != nil {
containerInstancesArns = append(containerInstancesArns, task.ContainerInstanceArn)
}
2017-01-05 14:24:17 +00:00
}
2018-07-17 10:26:03 +00:00
for _, arns := range p.chunkIDs(containerInstancesArns) {
resp, err := client.ecs.DescribeContainerInstancesWithContext(ctx, &ecs.DescribeContainerInstancesInput{
ContainerInstances: arns,
Cluster: clusterName,
})
2018-07-17 10:26:03 +00:00
if err != nil {
log.Errorf("Unable to describe container instances: %v", err)
return nil, err
}
2017-01-05 14:24:17 +00:00
2018-07-17 10:26:03 +00:00
for _, container := range resp.ContainerInstances {
instanceIds[aws.StringValue(container.Ec2InstanceId)] = aws.StringValue(container.ContainerInstanceArn)
instanceArns = append(instanceArns, container.Ec2InstanceId)
}
2018-05-28 16:52:03 +00:00
}
2017-01-05 14:24:17 +00:00
2018-05-28 16:52:03 +00:00
if len(instanceArns) > 0 {
2018-07-17 10:26:03 +00:00
for _, ids := range p.chunkIDs(instanceArns) {
input := &ec2.DescribeInstancesInput{
InstanceIds: ids,
}
2017-01-05 14:24:17 +00:00
2018-07-17 10:26:03 +00:00
err := client.ec2.DescribeInstancesPagesWithContext(ctx, input, func(page *ec2.DescribeInstancesOutput, lastPage bool) bool {
if len(page.Reservations) > 0 {
for _, r := range page.Reservations {
for _, i := range r.Instances {
if i.InstanceId != nil {
ec2Instances[instanceIds[aws.StringValue(i.InstanceId)]] = i
}
2018-05-28 16:52:03 +00:00
}
}
}
2018-07-17 10:26:03 +00:00
return !lastPage
})
2018-05-28 16:52:03 +00:00
2018-07-17 10:26:03 +00:00
if err != nil {
2018-09-07 07:40:03 +00:00
log.Errorf("Unable to describe instances: %v", err)
2018-07-17 10:26:03 +00:00
return nil, err
}
}
2017-01-05 14:24:17 +00:00
}
2018-05-28 16:52:03 +00:00
return ec2Instances, nil
}
2017-01-05 14:24:17 +00:00
2018-05-28 16:52:03 +00:00
func (p *Provider) lookupTaskDefinitions(ctx context.Context, client *awsClient, taskDefArns map[string]*ecs.Task) (map[string]*ecs.TaskDefinition, error) {
taskDef := make(map[string]*ecs.TaskDefinition)
for arn, task := range taskDefArns {
if definition, ok := existingTaskDefCache.Get(arn); ok {
taskDef[arn] = definition.(*ecs.TaskDefinition)
log.Debugf("Found cached task definition for %s. Skipping the call", arn)
} else {
resp, err := client.ecs.DescribeTaskDefinitionWithContext(ctx, &ecs.DescribeTaskDefinitionInput{
TaskDefinition: task.TaskDefinitionArn,
})
2017-01-05 14:24:17 +00:00
if err != nil {
log.Errorf("Unable to describe task definition: %s", err)
return nil, err
}
2017-01-05 14:24:17 +00:00
taskDef[arn] = resp.TaskDefinition
existingTaskDefCache.Set(arn, resp.TaskDefinition, cache.DefaultExpiration)
}
2017-01-05 14:24:17 +00:00
}
2018-05-28 16:52:03 +00:00
return taskDef, nil
2017-01-05 14:24:17 +00:00
}
2018-04-11 10:26:03 +00:00
func (p *Provider) loadECSConfig(ctx context.Context, client *awsClient) (*types.Configuration, error) {
instances, err := p.listInstances(ctx, client)
if err != nil {
return nil, err
2017-01-05 14:24:17 +00:00
}
2018-04-11 10:26:03 +00:00
return p.buildConfiguration(instances)
2017-01-05 14:24:17 +00:00
}
2018-07-17 10:26:03 +00:00
// chunkIDs ECS expects no more than 100 parameters be passed to a API call;
// thus, pack each string into an array capped at 100 elements
func (p *Provider) chunkIDs(ids []*string) [][]*string {
var chuncked [][]*string
for i := 0; i < len(ids); i += 100 {
2018-08-01 14:58:03 +00:00
var sliceEnd int
2018-07-17 10:26:03 +00:00
if i+100 < len(ids) {
sliceEnd = i + 100
} else {
sliceEnd = len(ids)
}
chuncked = append(chuncked, ids[i:sliceEnd])
}
return chuncked
}