Add support for several ECS backends

This commit is contained in:
Michael 2017-08-22 11:46:03 +02:00 committed by Traefiker
parent 05665f4eec
commit 8765494cbd
7 changed files with 268 additions and 95 deletions

View file

@ -19,6 +19,7 @@ import (
"github.com/containous/traefik/acme" "github.com/containous/traefik/acme"
"github.com/containous/traefik/cluster" "github.com/containous/traefik/cluster"
"github.com/containous/traefik/log" "github.com/containous/traefik/log"
"github.com/containous/traefik/provider/ecs"
"github.com/containous/traefik/provider/kubernetes" "github.com/containous/traefik/provider/kubernetes"
"github.com/containous/traefik/provider/rancher" "github.com/containous/traefik/provider/rancher"
"github.com/containous/traefik/safe" "github.com/containous/traefik/safe"
@ -144,6 +145,7 @@ Complete documentation is available at https://traefik.io`,
f.AddParser(reflect.TypeOf(server.RootCAs{}), &server.RootCAs{}) f.AddParser(reflect.TypeOf(server.RootCAs{}), &server.RootCAs{})
f.AddParser(reflect.TypeOf(types.Constraints{}), &types.Constraints{}) f.AddParser(reflect.TypeOf(types.Constraints{}), &types.Constraints{})
f.AddParser(reflect.TypeOf(kubernetes.Namespaces{}), &kubernetes.Namespaces{}) f.AddParser(reflect.TypeOf(kubernetes.Namespaces{}), &kubernetes.Namespaces{})
f.AddParser(reflect.TypeOf(ecs.Clusters{}), &ecs.Clusters{})
f.AddParser(reflect.TypeOf([]acme.Domain{}), &acme.Domains{}) f.AddParser(reflect.TypeOf([]acme.Domain{}), &acme.Domains{})
f.AddParser(reflect.TypeOf(types.Buckets{}), &types.Buckets{}) f.AddParser(reflect.TypeOf(types.Buckets{}), &types.Buckets{})

View file

@ -1708,10 +1708,16 @@ Træfik can be configured to use Amazon ECS as a backend configuration:
# ECS Cluster Name # ECS Cluster Name
# #
# Optional # Deprecated - Please use Clusters
# Default: "default"
# #
Cluster = "default" # Cluster = "default"
# ECS Clusters Name
#
# Optional
# Default: ["default"]
#
Clusters = ["default"]
# Enable watch ECS changes # Enable watch ECS changes
# #
@ -1720,6 +1726,13 @@ Cluster = "default"
# #
Watch = true Watch = true
# Enable auto discover ECS clusters
#
# Optional
# Default: false
#
AutoDiscoverClusters = false
# Polling interval (in seconds) # Polling interval (in seconds)
# #
# Optional # Optional
@ -1780,6 +1793,8 @@ Træfik needs the following policy to read ECS information:
"Sid": "Traefik ECS read access", "Sid": "Traefik ECS read access",
"Effect": "Allow", "Effect": "Allow",
"Action": [ "Action": [
"ecs:ListClusters",
"ecs:DescribeClusters",
"ecs:ListTasks", "ecs:ListTasks",
"ecs:DescribeTasks", "ecs:DescribeTasks",
"ecs:DescribeContainerInstances", "ecs:DescribeContainerInstances",

32
provider/ecs/cluster.go Normal file
View file

@ -0,0 +1,32 @@
package ecs
import (
"fmt"
"strings"
)
// Clusters holds ecs clusters name
type Clusters []string
// Set adds strings elem into the the parser
// it splits str on , and ;
func (c *Clusters) Set(str string) error {
fargs := func(c rune) bool {
return c == ',' || c == ';'
}
// get function
slice := strings.FieldsFunc(str, fargs)
*c = append(*c, slice...)
return nil
}
// Get Clusters
func (c *Clusters) Get() interface{} { return Clusters(*c) }
// String return slice in a string
func (c *Clusters) String() string { return fmt.Sprintf("%v", *c) }
// SetValue sets Clusters into the parser
func (c *Clusters) SetValue(val interface{}) {
*c = Clusters(val.(Clusters))
}

View file

@ -0,0 +1,80 @@
package ecs
import (
"reflect"
"testing"
)
func TestClustersSet(t *testing.T) {
checkMap := map[string]Clusters{
"cluster": {"cluster"},
"cluster1,cluster2": {"cluster1", "cluster2"},
"cluster1;cluster2": {"cluster1", "cluster2"},
"cluster1,cluster2;cluster3": {"cluster1", "cluster2", "cluster3"},
}
for str, check := range checkMap {
var clusters Clusters
if err := clusters.Set(str); err != nil {
t.Fatalf("Error :%s", err)
}
if !reflect.DeepEqual(clusters, check) {
t.Fatalf("Expected:%s\ngot:%s", check, clusters)
}
}
}
func TestClustersGet(t *testing.T) {
slices := []Clusters{
{"cluster"},
{"cluster1", "cluster2"},
{"cluster1", "cluster2", "cluster3"},
}
check := []Clusters{
{"cluster"},
{"cluster1", "cluster2"},
{"cluster1", "cluster2", "cluster3"},
}
for i, slice := range slices {
if !reflect.DeepEqual(slice.Get(), check[i]) {
t.Fatalf("Expected:%s\ngot:%s", check[i], slice)
}
}
}
func TestClustersString(t *testing.T) {
slices := []Clusters{
{"cluster"},
{"cluster1", "cluster2"},
{"cluster1", "cluster2", "cluster3"},
}
check := []string{
"[cluster]",
"[cluster1 cluster2]",
"[cluster1 cluster2 cluster3]",
}
for i, slice := range slices {
if !reflect.DeepEqual(slice.String(), check[i]) {
t.Fatalf("Expected:%s\ngot:%s", check[i], slice)
}
}
}
func TestClustersSetValue(t *testing.T) {
check := []Clusters{
{"cluster"},
{"cluster1", "cluster2"},
{"cluster1", "cluster2", "cluster3"},
}
slices := []Clusters{
{"cluster"},
{"cluster1", "cluster2"},
{"cluster1", "cluster2", "cluster3"},
}
for i, s := range slices {
var slice Clusters
slice.SetValue(s)
if !reflect.DeepEqual(slice, check[i]) {
t.Fatalf("Expected:%s\ngot:%s", check[i], slice)
}
}
}

View file

@ -36,10 +36,12 @@ type Provider struct {
RefreshSeconds int `description:"Polling interval (in seconds)"` RefreshSeconds int `description:"Polling interval (in seconds)"`
// Provider lookup parameters // Provider lookup parameters
Cluster string `description:"ECS Cluster Name"` Clusters Clusters `description:"ECS Clusters name"`
Region string `description:"The AWS region to use for requests"` Cluster string `description:"deprecated - ECS Cluster name"` // deprecated
AccessKeyID string `description:"The AWS credentials access key to use for making requests"` AutoDiscoverClusters bool `description:"Auto discover cluster"`
SecretAccessKey string `description:"The AWS credentials access key to use for making requests"` Region string `description:"The AWS region to use for requests"`
AccessKeyID string `description:"The AWS credentials access key to use for making requests"`
SecretAccessKey string `description:"The AWS credentials access key to use for making requests"`
} }
type ecsInstance struct { type ecsInstance struct {
@ -200,103 +202,138 @@ func (p *Provider) loadECSConfig(ctx context.Context, client *awsClient) (*types
// and the EC2 instance data // and the EC2 instance data
func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsInstance, error) { func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsInstance, error) {
var taskArns []*string var taskArns []*string
req, _ := client.ecs.ListTasksRequest(&ecs.ListTasksInput{
Cluster: &p.Cluster,
DesiredStatus: aws.String(ecs.DesiredStatusRunning),
})
for ; req != nil; req = req.NextPage() {
if err := wrapAws(ctx, req); err != nil {
return nil, err
}
taskArns = append(taskArns, req.Data.(*ecs.ListTasksOutput).TaskArns...)
}
// Early return: if we can't list tasks we have nothing to
// describe below - likely empty cluster/permissions are bad. This
// stops the AWS API from returning a 401 when you DescribeTasks
// with no input.
if len(taskArns) == 0 {
return []ecsInstance{}, nil
}
chunkedTaskArns := p.chunkedTaskArns(taskArns)
var tasks []*ecs.Task
for _, arns := range chunkedTaskArns {
req, taskResp := client.ecs.DescribeTasksRequest(&ecs.DescribeTasksInput{
Tasks: arns,
Cluster: &p.Cluster,
})
if err := wrapAws(ctx, req); err != nil {
return nil, err
}
tasks = append(tasks, taskResp.Tasks...)
}
containerInstanceArns := make([]*string, 0)
byContainerInstance := make(map[string]int)
taskDefinitionArns := make([]*string, 0)
byTaskDefinition := make(map[string]int)
for _, task := range tasks {
if _, found := byContainerInstance[*task.ContainerInstanceArn]; !found {
byContainerInstance[*task.ContainerInstanceArn] = len(containerInstanceArns)
containerInstanceArns = append(containerInstanceArns, task.ContainerInstanceArn)
}
if _, found := byTaskDefinition[*task.TaskDefinitionArn]; !found {
byTaskDefinition[*task.TaskDefinitionArn] = len(taskDefinitionArns)
taskDefinitionArns = append(taskDefinitionArns, task.TaskDefinitionArn)
}
}
machines, err := p.lookupEc2Instances(ctx, client, containerInstanceArns)
if err != nil {
return nil, err
}
taskDefinitions, err := p.lookupTaskDefinitions(ctx, client, taskDefinitionArns)
if err != nil {
return nil, err
}
var instances []ecsInstance var instances []ecsInstance
for _, task := range tasks { var clustersArn []*string
var clusters Clusters
machineIdx := byContainerInstance[*task.ContainerInstanceArn] if p.AutoDiscoverClusters {
taskDefIdx := byTaskDefinition[*task.TaskDefinitionArn] input := &ecs.ListClustersInput{}
for {
for _, container := range task.Containers { result, err := client.ecs.ListClusters(input)
if err != nil {
taskDefinition := taskDefinitions[taskDefIdx] return nil, err
var containerDefinition *ecs.ContainerDefinition }
for _, def := range taskDefinition.ContainerDefinitions { if result != nil {
if *container.Name == *def.Name { clustersArn = append(clustersArn, result.ClusterArns...)
containerDefinition = def input.NextToken = result.NextToken
if result.NextToken == nil {
break break
} }
} else {
break
}
}
for _, carns := range clustersArn {
clusters = append(clusters, *carns)
}
} else if p.Cluster != "" {
// TODO: Deprecated configuration - Need to be removed in the future
clusters = Clusters{p.Cluster}
log.Warn("Deprecated configuration found: ecs.cluster " +
"Please use ecs.clusters instead.")
} else {
clusters = p.Clusters
}
log.Debugf("ECS Clusters: %s", clusters)
for _, c := range clusters {
req, _ := client.ecs.ListTasksRequest(&ecs.ListTasksInput{
Cluster: &c,
DesiredStatus: aws.String(ecs.DesiredStatusRunning),
})
for ; req != nil; req = req.NextPage() {
if err := wrapAws(ctx, req); err != nil {
return nil, err
} }
instances = append(instances, ecsInstance{ taskArns = append(taskArns, req.Data.(*ecs.ListTasksOutput).TaskArns...)
fmt.Sprintf("%s-%s", strings.Replace(*task.Group, ":", "-", 1), *container.Name), }
(*task.TaskArn)[len(*task.TaskArn)-12:],
task, // Early return: if we can't list tasks we have nothing to
taskDefinition, // describe below - likely empty cluster/permissions are bad. This
container, // stops the AWS API from returning a 401 when you DescribeTasks
containerDefinition, // with no input.
machines[machineIdx], if len(taskArns) == 0 {
return []ecsInstance{}, nil
}
chunkedTaskArns := p.chunkedTaskArns(taskArns)
var tasks []*ecs.Task
for _, arns := range chunkedTaskArns {
req, taskResp := client.ecs.DescribeTasksRequest(&ecs.DescribeTasksInput{
Tasks: arns,
Cluster: &c,
}) })
if err := wrapAws(ctx, req); err != nil {
return nil, err
}
tasks = append(tasks, taskResp.Tasks...)
}
containerInstanceArns := make([]*string, 0)
byContainerInstance := make(map[string]int)
taskDefinitionArns := make([]*string, 0)
byTaskDefinition := make(map[string]int)
for _, task := range tasks {
if _, found := byContainerInstance[*task.ContainerInstanceArn]; !found {
byContainerInstance[*task.ContainerInstanceArn] = len(containerInstanceArns)
containerInstanceArns = append(containerInstanceArns, task.ContainerInstanceArn)
}
if _, found := byTaskDefinition[*task.TaskDefinitionArn]; !found {
byTaskDefinition[*task.TaskDefinitionArn] = len(taskDefinitionArns)
taskDefinitionArns = append(taskDefinitionArns, task.TaskDefinitionArn)
}
}
machines, err := p.lookupEc2Instances(ctx, client, &c, containerInstanceArns)
if err != nil {
return nil, err
}
taskDefinitions, err := p.lookupTaskDefinitions(ctx, client, taskDefinitionArns)
if err != nil {
return nil, err
}
for _, task := range tasks {
machineIdx := byContainerInstance[*task.ContainerInstanceArn]
taskDefIdx := byTaskDefinition[*task.TaskDefinitionArn]
for _, container := range task.Containers {
taskDefinition := taskDefinitions[taskDefIdx]
var containerDefinition *ecs.ContainerDefinition
for _, def := range taskDefinition.ContainerDefinitions {
if *container.Name == *def.Name {
containerDefinition = def
break
}
}
instances = append(instances, ecsInstance{
fmt.Sprintf("%s-%s", strings.Replace(*task.Group, ":", "-", 1), *container.Name),
(*task.TaskArn)[len(*task.TaskArn)-12:],
task,
taskDefinition,
container,
containerDefinition,
machines[machineIdx],
})
}
} }
} }
return instances, nil return instances, nil
} }
func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, containerArns []*string) ([]*ec2.Instance, error) { func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, clusterName *string, containerArns []*string) ([]*ec2.Instance, error) {
order := make(map[string]int) order := make(map[string]int)
instanceIds := make([]*string, len(containerArns)) instanceIds := make([]*string, len(containerArns))
@ -307,7 +344,7 @@ func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, co
req, _ := client.ecs.DescribeContainerInstancesRequest(&ecs.DescribeContainerInstancesInput{ req, _ := client.ecs.DescribeContainerInstancesRequest(&ecs.DescribeContainerInstancesInput{
ContainerInstances: containerArns, ContainerInstances: containerArns,
Cluster: &p.Cluster, Cluster: clusterName,
}) })
for ; req != nil; req = req.NextPage() { for ; req != nil; req = req.NextPage() {

View file

@ -534,8 +534,9 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
var defaultECS ecs.Provider var defaultECS ecs.Provider
defaultECS.Watch = true defaultECS.Watch = true
defaultECS.ExposedByDefault = true defaultECS.ExposedByDefault = true
defaultECS.AutoDiscoverClusters = false
defaultECS.Clusters = ecs.Clusters{"default"}
defaultECS.RefreshSeconds = 15 defaultECS.RefreshSeconds = 15
defaultECS.Cluster = "default"
defaultECS.Constraints = types.Constraints{} defaultECS.Constraints = types.Constraints{}
//default Rancher //default Rancher

View file

@ -1095,11 +1095,17 @@
# ECS Cluster Name # ECS Cluster Name
# #
# Optional # Deprecated - Please use Clusters
# Default: "default"
# #
# Cluster = "default" # Cluster = "default"
# ECS Clusters Name
#
# Optional
# Default: ["default"]
#
# Clusters = ["default"]
# Enable watch ECS changes # Enable watch ECS changes
# #
# Optional # Optional