traefik/provider/ecs/ecs.go

443 lines
12 KiB
Go
Raw Normal View History

package ecs
2017-01-05 14:24:17 +00:00
import (
"context"
"fmt"
"strings"
"time"
"github.com/BurntSushi/ty/fun"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/defaults"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ecs"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/provider/label"
2017-01-05 14:24:17 +00:00
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
)
var _ provider.Provider = (*Provider)(nil)
2017-01-05 14:24:17 +00:00
// Provider holds configurations of the provider.
type Provider struct {
2017-10-02 08:32:02 +00:00
provider.BaseProvider `mapstructure:",squash" export:"true"`
2017-01-05 14:24:17 +00:00
Domain string `description:"Default domain used"`
2017-10-02 08:32:02 +00:00
ExposedByDefault bool `description:"Expose containers by default" export:"true"`
RefreshSeconds int `description:"Polling interval (in seconds)" export:"true"`
2017-01-05 14:24:17 +00:00
// Provider lookup parameters
2017-08-22 09:46:03 +00:00
Clusters Clusters `description:"ECS Clusters name"`
Cluster string `description:"deprecated - ECS Cluster name"` // deprecated
2017-10-02 08:32:02 +00:00
AutoDiscoverClusters bool `description:"Auto discover cluster" export:"true"`
Region string `description:"The AWS region to use for requests" export:"true"`
2017-08-22 09:46:03 +00:00
AccessKeyID string `description:"The AWS credentials access key to use for making requests"`
SecretAccessKey string `description:"The AWS credentials access key to use for making requests"`
2017-01-05 14:24:17 +00:00
}
type ecsInstance struct {
Name string
ID string
task *ecs.Task
taskDefinition *ecs.TaskDefinition
container *ecs.Container
containerDefinition *ecs.ContainerDefinition
machine *ec2.Instance
}
type awsClient struct {
ecs *ecs.ECS
ec2 *ec2.EC2
}
func (p *Provider) createClient() (*awsClient, error) {
2017-01-05 14:24:17 +00:00
sess := session.New()
ec2meta := ec2metadata.New(sess)
if p.Region == "" {
2017-01-05 14:24:17 +00:00
log.Infoln("No EC2 region provided, querying instance metadata endpoint...")
identity, err := ec2meta.GetInstanceIdentityDocument()
if err != nil {
return nil, err
}
p.Region = identity.Region
2017-01-05 14:24:17 +00:00
}
cfg := &aws.Config{
Region: &p.Region,
2017-01-05 14:24:17 +00:00
Credentials: credentials.NewChainCredentials(
[]credentials.Provider{
&credentials.StaticProvider{
Value: credentials.Value{
AccessKeyID: p.AccessKeyID,
SecretAccessKey: p.SecretAccessKey,
2017-01-05 14:24:17 +00:00
},
},
&credentials.EnvProvider{},
&credentials.SharedCredentialsProvider{},
defaults.RemoteCredProvider(*(defaults.Config()), defaults.Handlers()),
}),
}
2017-07-07 21:48:53 +00:00
if p.Trace {
cfg.WithLogger(aws.LoggerFunc(func(args ...interface{}) {
log.Debug(args...)
}))
}
2017-01-05 14:24:17 +00:00
return &awsClient{
ecs.New(sess, cfg),
ec2.New(sess, cfg),
}, nil
}
// Provide allows the ecs provider to provide configurations to traefik
2017-01-05 14:24:17 +00:00
// using the given configuration channel.
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
2017-01-05 14:24:17 +00:00
p.Constraints = append(p.Constraints, constraints...)
2017-01-05 14:24:17 +00:00
handleCanceled := func(ctx context.Context, err error) error {
if ctx.Err() == context.Canceled || err == context.Canceled {
return nil
}
return err
}
pool.Go(func(stop chan bool) {
ctx, cancel := context.WithCancel(context.Background())
safe.Go(func() {
2017-01-05 14:24:17 +00:00
select {
case <-stop:
cancel()
}
})
2017-01-05 14:24:17 +00:00
operation := func() error {
2017-10-10 09:10:02 +00:00
awsClient, err := p.createClient()
2017-01-05 14:24:17 +00:00
if err != nil {
return err
}
2017-10-10 09:10:02 +00:00
configuration, err := p.loadECSConfig(ctx, awsClient)
2017-01-05 14:24:17 +00:00
if err != nil {
return handleCanceled(ctx, err)
}
configurationChan <- types.ConfigMessage{
ProviderName: "ecs",
Configuration: configuration,
}
if p.Watch {
reload := time.NewTicker(time.Second * time.Duration(p.RefreshSeconds))
2017-01-05 14:24:17 +00:00
defer reload.Stop()
for {
select {
case <-reload.C:
2017-10-10 09:10:02 +00:00
configuration, err := p.loadECSConfig(ctx, awsClient)
2017-01-05 14:24:17 +00:00
if err != nil {
return handleCanceled(ctx, err)
}
configurationChan <- types.ConfigMessage{
ProviderName: "ecs",
Configuration: configuration,
}
case <-ctx.Done():
return handleCanceled(ctx, ctx.Err())
}
}
}
return nil
}
notify := func(err error, time time.Duration) {
log.Errorf("Provider connection error %+v, retrying in %s", err, time)
2017-01-05 14:24:17 +00:00
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Cannot connect to Provider api %+v", err)
2017-01-05 14:24:17 +00:00
}
})
return nil
}
func wrapAws(ctx context.Context, req *request.Request) error {
req.HTTPRequest = req.HTTPRequest.WithContext(ctx)
return req.Send()
}
2017-10-25 15:18:03 +00:00
func (p *Provider) loadECSConfig(ctx context.Context, client *awsClient) (*types.Configuration, error) {
instances, err := p.listInstances(ctx, client)
2017-01-05 14:24:17 +00:00
if err != nil {
return nil, err
}
instances = fun.Filter(p.filterInstance, instances).([]ecsInstance)
2017-01-05 14:24:17 +00:00
2017-08-25 09:42:03 +00:00
services := make(map[string][]ecsInstance)
for _, instance := range instances {
if serviceInstances, ok := services[instance.Name]; ok {
services[instance.Name] = append(serviceInstances, instance)
} else {
services[instance.Name] = []ecsInstance{instance}
}
}
return p.buildConfiguration(services)
2017-01-05 14:24:17 +00:00
}
// Find all running Provider tasks in a cluster, also collect the task definitions (for docker labels)
2017-01-05 14:24:17 +00:00
// and the EC2 instance data
func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsInstance, error) {
2017-08-22 09:46:03 +00:00
var instances []ecsInstance
var clustersArn []*string
var clusters Clusters
2017-01-05 14:24:17 +00:00
2017-08-22 09:46:03 +00:00
if p.AutoDiscoverClusters {
input := &ecs.ListClustersInput{}
for {
result, err := client.ecs.ListClusters(input)
if err != nil {
return nil, err
}
if result != nil {
clustersArn = append(clustersArn, result.ClusterArns...)
input.NextToken = result.NextToken
if result.NextToken == nil {
break
}
} else {
break
}
2017-01-05 14:24:17 +00:00
}
2017-08-22 09:46:03 +00:00
for _, carns := range clustersArn {
clusters = append(clusters, *carns)
}
} else if p.Cluster != "" {
// TODO: Deprecated configuration - Need to be removed in the future
clusters = Clusters{p.Cluster}
log.Warn("Deprecated configuration found: ecs.cluster " +
"Please use ecs.clusters instead.")
} else {
clusters = p.Clusters
2017-01-05 14:24:17 +00:00
}
2017-08-22 09:46:03 +00:00
log.Debugf("ECS Clusters: %s", clusters)
for _, c := range clusters {
2017-01-05 14:24:17 +00:00
2017-08-22 09:46:03 +00:00
req, _ := client.ecs.ListTasksRequest(&ecs.ListTasksInput{
Cluster: &c,
DesiredStatus: aws.String(ecs.DesiredStatusRunning),
})
2017-09-29 14:56:03 +00:00
var taskArns []*string
2017-08-22 09:46:03 +00:00
for ; req != nil; req = req.NextPage() {
if err := wrapAws(ctx, req); err != nil {
return nil, err
}
2017-08-22 09:46:03 +00:00
taskArns = append(taskArns, req.Data.(*ecs.ListTasksOutput).TaskArns...)
}
2017-09-29 14:56:03 +00:00
// Skip to the next cluster if there are no tasks found on
// this cluster.
2017-08-22 09:46:03 +00:00
if len(taskArns) == 0 {
2017-09-29 14:56:03 +00:00
continue
}
2017-01-05 14:24:17 +00:00
chunkedTaskArns := chunkedTaskArns(taskArns)
2017-08-22 09:46:03 +00:00
var tasks []*ecs.Task
2017-01-05 14:24:17 +00:00
2017-08-22 09:46:03 +00:00
for _, arns := range chunkedTaskArns {
req, taskResp := client.ecs.DescribeTasksRequest(&ecs.DescribeTasksInput{
Tasks: arns,
Cluster: &c,
})
2017-01-05 14:24:17 +00:00
2017-08-22 09:46:03 +00:00
if err := wrapAws(ctx, req); err != nil {
return nil, err
}
tasks = append(tasks, taskResp.Tasks...)
2017-01-05 14:24:17 +00:00
}
2017-08-22 09:46:03 +00:00
containerInstanceArns := make([]*string, 0)
byContainerInstance := make(map[string]int)
taskDefinitionArns := make([]*string, 0)
byTaskDefinition := make(map[string]int)
for _, task := range tasks {
if _, found := byContainerInstance[*task.ContainerInstanceArn]; !found {
byContainerInstance[*task.ContainerInstanceArn] = len(containerInstanceArns)
containerInstanceArns = append(containerInstanceArns, task.ContainerInstanceArn)
}
if _, found := byTaskDefinition[*task.TaskDefinitionArn]; !found {
byTaskDefinition[*task.TaskDefinitionArn] = len(taskDefinitionArns)
taskDefinitionArns = append(taskDefinitionArns, task.TaskDefinitionArn)
}
2017-01-05 14:24:17 +00:00
}
2017-08-22 09:46:03 +00:00
machines, err := p.lookupEc2Instances(ctx, client, &c, containerInstanceArns)
if err != nil {
return nil, err
}
2017-01-05 14:24:17 +00:00
2017-08-22 09:46:03 +00:00
taskDefinitions, err := p.lookupTaskDefinitions(ctx, client, taskDefinitionArns)
if err != nil {
return nil, err
}
2017-01-05 14:24:17 +00:00
2017-08-22 09:46:03 +00:00
for _, task := range tasks {
2017-01-05 14:24:17 +00:00
2017-08-22 09:46:03 +00:00
machineIdx := byContainerInstance[*task.ContainerInstanceArn]
taskDefIdx := byTaskDefinition[*task.TaskDefinitionArn]
2017-01-05 14:24:17 +00:00
2017-08-22 09:46:03 +00:00
for _, container := range task.Containers {
2017-01-05 14:24:17 +00:00
2017-08-22 09:46:03 +00:00
taskDefinition := taskDefinitions[taskDefIdx]
var containerDefinition *ecs.ContainerDefinition
for _, def := range taskDefinition.ContainerDefinitions {
if *container.Name == *def.Name {
containerDefinition = def
break
}
2017-01-05 14:24:17 +00:00
}
2017-08-22 09:46:03 +00:00
instances = append(instances, ecsInstance{
fmt.Sprintf("%s-%s", strings.Replace(*task.Group, ":", "-", 1), *container.Name),
(*task.TaskArn)[len(*task.TaskArn)-12:],
task,
taskDefinition,
container,
containerDefinition,
machines[machineIdx],
})
}
2017-01-05 14:24:17 +00:00
}
}
return instances, nil
}
2017-08-22 09:46:03 +00:00
func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, clusterName *string, containerArns []*string) ([]*ec2.Instance, error) {
2017-01-05 14:24:17 +00:00
order := make(map[string]int)
instanceIds := make([]*string, len(containerArns))
instances := make([]*ec2.Instance, len(containerArns))
2017-01-05 14:24:17 +00:00
for i, arn := range containerArns {
order[*arn] = i
}
req, _ := client.ecs.DescribeContainerInstancesRequest(&ecs.DescribeContainerInstancesInput{
ContainerInstances: containerArns,
2017-08-22 09:46:03 +00:00
Cluster: clusterName,
})
for ; req != nil; req = req.NextPage() {
if err := wrapAws(ctx, req); err != nil {
return nil, err
}
containerResp := req.Data.(*ecs.DescribeContainerInstancesOutput)
for i, container := range containerResp.ContainerInstances {
order[*container.Ec2InstanceId] = order[*container.ContainerInstanceArn]
instanceIds[i] = container.Ec2InstanceId
}
2017-01-05 14:24:17 +00:00
}
req, _ = client.ec2.DescribeInstancesRequest(&ec2.DescribeInstancesInput{
2017-01-05 14:24:17 +00:00
InstanceIds: instanceIds,
})
for ; req != nil; req = req.NextPage() {
if err := wrapAws(ctx, req); err != nil {
return nil, err
}
2017-01-05 14:24:17 +00:00
instancesResp := req.Data.(*ec2.DescribeInstancesOutput)
for _, r := range instancesResp.Reservations {
for _, i := range r.Instances {
if i.InstanceId != nil {
instances[order[*i.InstanceId]] = i
}
}
}
2017-01-05 14:24:17 +00:00
}
return instances, nil
}
func (p *Provider) lookupTaskDefinitions(ctx context.Context, client *awsClient, taskDefArns []*string) ([]*ecs.TaskDefinition, error) {
2017-01-05 14:24:17 +00:00
taskDefinitions := make([]*ecs.TaskDefinition, len(taskDefArns))
for i, arn := range taskDefArns {
req, resp := client.ecs.DescribeTaskDefinitionRequest(&ecs.DescribeTaskDefinitionInput{
TaskDefinition: arn,
})
if err := wrapAws(ctx, req); err != nil {
return nil, err
}
taskDefinitions[i] = resp.TaskDefinition
}
return taskDefinitions, nil
}
func (p *Provider) filterInstance(i ecsInstance) bool {
if labelPort := getStringValue(i, label.TraefikPort, ""); len(i.container.NetworkBindings) == 0 && labelPort == "" {
2017-01-05 14:24:17 +00:00
log.Debugf("Filtering ecs instance without port %s (%s)", i.Name, i.ID)
return false
}
if i.machine == nil || i.machine.State == nil || i.machine.State.Name == nil {
log.Debugf("Filtering ecs instance in an missing ec2 information %s (%s)", i.Name, i.ID)
return false
}
if *i.machine.State.Name != ec2.InstanceStateNameRunning {
log.Debugf("Filtering ecs instance in an incorrect state %s (%s) (state = %s)", i.Name, i.ID, *i.machine.State.Name)
return false
}
if i.machine.PrivateIpAddress == nil {
log.Debugf("Filtering ecs instance without an ip address %s (%s)", i.Name, i.ID)
return false
}
if !isEnabled(i, p.ExposedByDefault) {
log.Debugf("Filtering disabled ecs instance %s (%s)", i.Name, i.ID)
2017-01-05 14:24:17 +00:00
return false
}
return true
}
// Provider expects no more than 100 parameters be passed to a DescribeTask call; thus, pack
// each string into an array capped at 100 elements
func chunkedTaskArns(tasks []*string) [][]*string {
var chunkedTasks [][]*string
for i := 0; i < len(tasks); i += 100 {
var sliceEnd int
if i+100 < len(tasks) {
sliceEnd = i + 100
} else {
sliceEnd = len(tasks)
}
chunkedTasks = append(chunkedTasks, tasks[i:sliceEnd])
}
return chunkedTasks
}