Check for explicitly defined Marathon port first.

Previously, we did the check too late resulting in the traefik.port
label not being effective.

The change comes with additional refactorings in production and tests.
This commit is contained in:
Timo Reimann 2017-04-20 22:05:21 +02:00
parent f1bc80ca12
commit 099d605aed
3 changed files with 290 additions and 228 deletions

View file

@ -2,6 +2,7 @@ package marathon
import (
"errors"
"fmt"
"math"
"net"
"net/http"
@ -22,6 +23,11 @@ import (
"github.com/gambol99/go-marathon"
)
const (
labelPort = "traefik.port"
labelPortIndex = "traefik.portIndex"
)
var _ provider.Provider = (*Provider)(nil)
// Provider holds configuration of the provider.
@ -194,14 +200,23 @@ func (p *Provider) loadMarathonConfig() *types.Configuration {
func (p *Provider) taskFilter(task marathon.Task, applications *marathon.Applications, exposedByDefaultFlag bool) bool {
application, err := getApplication(task, applications.Apps)
if err != nil {
log.Errorf("Unable to get marathon application from task %s", task.AppID)
log.Errorf("Unable to get Marathon application %s for task %s", task.AppID, task.ID)
return false
}
ports := processPorts(application, task)
if len(ports) == 0 {
log.Debug("Filtering marathon task without port %s", task.AppID)
if _, err = processPorts(application, task); err != nil {
log.Errorf("Filtering Marathon task %s from application %s without port: %s", task.ID, application.ID, err)
return false
}
// Filter illegal port label specification.
_, hasPortIndexLabel := p.getLabel(application, labelPortIndex)
_, hasPortLabel := p.getLabel(application, labelPort)
if hasPortIndexLabel && hasPortLabel {
log.Debugf("Filtering Marathon task %s from application %s specifying both traefik.portIndex and traefik.port labels", task.ID, application.ID)
return false
}
// Filter by constraints.
label, _ := p.getLabel(application, "traefik.tags")
constraintTags := strings.Split(label, ",")
if p.MarathonLBCompatibility {
@ -211,50 +226,29 @@ func (p *Provider) taskFilter(task marathon.Task, applications *marathon.Applica
}
if ok, failingConstraint := p.MatchConstraints(constraintTags); !ok {
if failingConstraint != nil {
log.Debugf("Application %v pruned by '%v' constraint", application.ID, failingConstraint.String())
log.Debugf("Filtering Marathon task %s from application %s pruned by '%v' constraint", task.ID, application.ID, failingConstraint.String())
}
return false
}
// Filter disabled application.
if !isApplicationEnabled(application, exposedByDefaultFlag) {
log.Debugf("Filtering disabled marathon task %s", task.AppID)
log.Debugf("Filtering disabled Marathon task %s from application %s", task.ID, application.ID)
return false
}
//filter indeterminable task port
portIndexLabel := (*application.Labels)["traefik.portIndex"]
portValueLabel := (*application.Labels)["traefik.port"]
if portIndexLabel != "" && portValueLabel != "" {
log.Debugf("Filtering marathon task %s specifying both traefik.portIndex and traefik.port labels", task.AppID)
return false
}
if portIndexLabel != "" {
index, err := strconv.Atoi((*application.Labels)["traefik.portIndex"])
if err != nil || index < 0 || index > len(ports)-1 {
log.Debugf("Filtering marathon task %s with unexpected value for traefik.portIndex label", task.AppID)
return false
}
}
if portValueLabel != "" {
_, err := strconv.Atoi((*application.Labels)["traefik.port"])
if err != nil {
log.Debugf("Filtering marathon task %s with unexpected value for traefik.port label", task.AppID)
return false
}
}
//filter healthchecks
// Filter task with existing, bad health check results.
if application.HasHealthChecks() {
if task.HasHealthCheckResults() {
for _, healthcheck := range task.HealthCheckResults {
// found one bad healthcheck, return false
if !healthcheck.Alive {
log.Debugf("Filtering marathon task %s with bad healthcheck", task.AppID)
log.Debugf("Filtering Marathon task %s from application %s with bad health check", task.ID, application.ID)
return false
}
}
}
}
return true
}
@ -303,23 +297,16 @@ func (p *Provider) getLabel(application marathon.Application, label string) (str
func (p *Provider) getPort(task marathon.Task, applications []marathon.Application) string {
application, err := getApplication(task, applications)
if err != nil {
log.Errorf("Unable to get marathon application from task %s", task.AppID)
log.Errorf("Unable to get Marathon application %s for task %s", application.ID, task.ID)
return ""
}
ports := processPorts(application, task)
if portIndexLabel, ok := p.getLabel(application, "traefik.portIndex"); ok {
if index, err := strconv.Atoi(portIndexLabel); err == nil {
return strconv.Itoa(ports[index])
}
}
if portValueLabel, ok := p.getLabel(application, "traefik.port"); ok {
return portValueLabel
port, err := processPorts(application, task)
if err != nil {
log.Errorf("Unable to process ports for Marathon application %s and task %s: %s", application.ID, task.ID, err)
return ""
}
for _, port := range ports {
return strconv.Itoa(port)
}
return ""
return strconv.Itoa(port)
}
func (p *Provider) getWeight(task marathon.Task, applications []marathon.Application) string {
@ -473,7 +460,39 @@ func (p *Provider) getCircuitBreakerExpression(application marathon.Application)
return "NetworkErrorRatio() > 1"
}
func processPorts(application marathon.Application, task marathon.Task) []int {
func processPorts(application marathon.Application, task marathon.Task) (int, error) {
if portLabel, ok := (*application.Labels)[labelPort]; ok {
port, err := strconv.Atoi(portLabel)
switch {
case err != nil:
return 0, fmt.Errorf("failed to parse port label: %s", err)
case port <= 0:
return 0, fmt.Errorf("explicitly specified port %d must be larger than zero", port)
}
return port, nil
}
ports := retrieveAvailablePorts(application, task)
if len(ports) == 0 {
return 0, errors.New("no port found")
}
portIndex := 0
portIndexLabel, ok := (*application.Labels)[labelPortIndex]
if ok {
var err error
portIndex, err = strconv.Atoi(portIndexLabel)
switch {
case err != nil:
return 0, fmt.Errorf("failed to parse port index label: %s", err)
case portIndex < 0, portIndex > len(ports)-1:
return 0, fmt.Errorf("port index %d must be within port range (0, %d)", portIndex, len(ports)-1)
}
}
return ports[portIndex], nil
}
func retrieveAvailablePorts(application marathon.Application, task marathon.Task) []int {
// Using default port configuration
if task.Ports != nil && len(task.Ports) > 0 {
return task.Ports

View file

@ -6,6 +6,7 @@ import (
"testing"
"github.com/containous/traefik/mocks"
"github.com/containous/traefik/testhelpers"
"github.com/containous/traefik/types"
"github.com/gambol99/go-marathon"
"github.com/stretchr/testify/mock"
@ -400,39 +401,30 @@ func TestMarathonTaskFilter(t *testing.T) {
},
{
task: marathon.Task{
AppID: "multiple-ports",
Ports: []int{80, 443},
AppID: "missing-port",
Ports: []int{},
},
applications: &marathon.Applications{
Apps: []marathon.Application{
{
ID: "multiple-ports",
Ports: []int{80, 443},
ID: "missing-port",
Labels: &map[string]string{},
},
},
},
expected: true,
expected: false,
exposedByDefault: true,
},
{
task: marathon.Task{
AppID: "ipAddressOnePort",
AppID: "existing-port",
Ports: []int{80},
},
applications: &marathon.Applications{
Apps: []marathon.Application{
{
ID: "ipAddressOnePort",
IPAddressPerTask: &marathon.IPAddressPerTask{
Discovery: &marathon.Discovery{
Ports: &[]marathon.Port{
{
Number: 8880,
Name: "p1",
},
},
},
},
ID: "existing-port",
Ports: []int{80},
Labels: &map[string]string{},
},
},
@ -440,64 +432,6 @@ func TestMarathonTaskFilter(t *testing.T) {
expected: true,
exposedByDefault: true,
},
{
task: marathon.Task{
AppID: "ipAddressTwoPortsUseFirst",
},
applications: &marathon.Applications{
Apps: []marathon.Application{
{
ID: "ipAddressTwoPortsUseFirst",
IPAddressPerTask: &marathon.IPAddressPerTask{
Discovery: &marathon.Discovery{
Ports: &[]marathon.Port{
{
Number: 8898,
Name: "p1",
}, {
Number: 9999,
Name: "p1",
},
},
},
},
Labels: &map[string]string{},
},
},
},
expected: true,
exposedByDefault: true,
},
{
task: marathon.Task{
AppID: "ipAddressValidTwoPorts",
},
applications: &marathon.Applications{
Apps: []marathon.Application{
{
ID: "ipAddressValidTwoPorts",
IPAddressPerTask: &marathon.IPAddressPerTask{
Discovery: &marathon.Discovery{
Ports: &[]marathon.Port{
{
Number: 8898,
Name: "p1",
}, {
Number: 9999,
Name: "p2",
},
},
},
},
Labels: &map[string]string{
"traefik.portIndex": "0",
},
},
},
},
expected: true,
exposedByDefault: true,
},
{
task: marathon.Task{
Ports: []int{80},
@ -516,62 +450,6 @@ func TestMarathonTaskFilter(t *testing.T) {
expected: false,
exposedByDefault: true,
},
{
task: marathon.Task{
AppID: "specify-port-number",
Ports: []int{80},
},
applications: &marathon.Applications{
Apps: []marathon.Application{
{
ID: "specify-port-number",
Labels: &map[string]string{
"traefik.port": "8080",
},
},
},
},
expected: true,
exposedByDefault: true,
},
{
task: marathon.Task{
AppID: "specify-port-index",
Ports: []int{80, 443},
},
applications: &marathon.Applications{
Apps: []marathon.Application{
{
ID: "specify-port-index",
Ports: []int{80, 443},
Labels: &map[string]string{
"traefik.portIndex": "0",
},
},
},
},
expected: true,
exposedByDefault: true,
},
{
task: marathon.Task{
AppID: "specify-out-of-range-port-index",
Ports: []int{80, 443},
},
applications: &marathon.Applications{
Apps: []marathon.Application{
{
ID: "specify-out-of-range-port-index",
Ports: []int{80, 443},
Labels: &map[string]string{
"traefik.portIndex": "2",
},
},
},
},
expected: false,
exposedByDefault: true,
},
{
task: marathon.Task{
AppID: "specify-both-port-index-and-number",
@ -594,13 +472,13 @@ func TestMarathonTaskFilter(t *testing.T) {
},
{
task: marathon.Task{
AppID: "foo",
AppID: "healthcheck-available",
Ports: []int{80},
},
applications: &marathon.Applications{
Apps: []marathon.Application{
{
ID: "foo",
ID: "healthcheck-available",
Ports: []int{80},
Labels: &map[string]string{},
HealthChecks: &[]marathon.HealthCheck{
@ -639,7 +517,7 @@ func TestMarathonTaskFilter(t *testing.T) {
},
{
task: marathon.Task{
AppID: "foo",
AppID: "healthcheck-mixed-results",
Ports: []int{80},
HealthCheckResults: []*marathon.HealthCheckResult{
{
@ -653,7 +531,7 @@ func TestMarathonTaskFilter(t *testing.T) {
applications: &marathon.Applications{
Apps: []marathon.Application{
{
ID: "foo",
ID: "healthcheck-mixed-results",
Ports: []int{80},
Labels: &map[string]string{},
HealthChecks: &[]marathon.HealthCheck{
@ -665,23 +543,6 @@ func TestMarathonTaskFilter(t *testing.T) {
expected: false,
exposedByDefault: true,
},
{
task: marathon.Task{
AppID: "single-port",
Ports: []int{80},
},
applications: &marathon.Applications{
Apps: []marathon.Application{
{
ID: "single-port",
Ports: []int{80},
Labels: &map[string]string{},
},
},
},
expected: true,
exposedByDefault: true,
},
{
task: marathon.Task{
AppID: "healthcheck-alive",
@ -981,16 +842,22 @@ func TestMarathonGetPort(t *testing.T) {
provider := &Provider{}
cases := []struct {
desc string
applications []marathon.Application
task marathon.Task
expected string
}{
{
desc: "no applications",
applications: []marathon.Application{},
task: marathon.Task{},
expected: "",
task: marathon.Task{
AppID: "app",
Ports: []int{80},
},
expected: "",
},
{
desc: "application mismatch",
applications: []marathon.Application{
{
ID: "test1",
@ -999,74 +866,240 @@ func TestMarathonGetPort(t *testing.T) {
},
task: marathon.Task{
AppID: "test2",
Ports: []int{80},
},
expected: "",
},
{
desc: "port missing",
applications: []marathon.Application{
{
ID: "test1",
ID: "app",
Labels: &map[string]string{},
},
},
task: marathon.Task{
AppID: "test1",
AppID: "app",
Ports: []int{},
},
expected: "",
},
{
desc: "explicit port taken",
applications: []marathon.Application{
{
ID: "app",
Labels: &map[string]string{
"traefik.port": "80",
},
},
},
task: marathon.Task{
AppID: "app",
Ports: []int{},
},
expected: "80",
},
{
desc: "illegal explicit port specified",
applications: []marathon.Application{
{
ID: "app",
Labels: &map[string]string{
"traefik.port": "foobar",
},
},
},
task: marathon.Task{
AppID: "app",
Ports: []int{80},
},
expected: "",
},
{
desc: "illegal explicit port integer specified",
applications: []marathon.Application{
{
ID: "app",
Labels: &map[string]string{
"traefik.port": "-1",
},
},
},
task: marathon.Task{
AppID: "app",
Ports: []int{80},
},
expected: "",
},
{
desc: "task port available",
applications: []marathon.Application{
{
ID: "app",
Labels: &map[string]string{},
PortDefinitions: &[]marathon.PortDefinition{
{
Port: testhelpers.Intp(443),
},
},
IPAddressPerTask: &marathon.IPAddressPerTask{
Discovery: &marathon.Discovery{
Ports: &[]marathon.Port{
{
Number: 8000,
},
},
},
},
},
},
task: marathon.Task{
AppID: "app",
Ports: []int{80},
},
expected: "80",
},
{
desc: "port mapping port available",
applications: []marathon.Application{
{
ID: "multiple-ports-take-first",
ID: "app",
Labels: &map[string]string{},
PortDefinitions: &[]marathon.PortDefinition{
{
Port: testhelpers.Intp(443),
},
},
IPAddressPerTask: &marathon.IPAddressPerTask{
Discovery: &marathon.Discovery{
Ports: &[]marathon.Port{
{
Number: 8000,
},
},
},
},
},
},
task: marathon.Task{
AppID: "app",
Ports: []int{},
},
expected: "443",
},
{
desc: "IP-per-task port available",
applications: []marathon.Application{
{
ID: "app",
Labels: &map[string]string{},
IPAddressPerTask: &marathon.IPAddressPerTask{
Discovery: &marathon.Discovery{
Ports: &[]marathon.Port{
{
Number: 8000,
},
},
},
},
},
},
task: marathon.Task{
AppID: "app",
Ports: []int{},
},
expected: "8000",
},
{
desc: "first port taken from multiple ports",
applications: []marathon.Application{
{
ID: "app",
Labels: &map[string]string{},
},
},
task: marathon.Task{
AppID: "multiple-ports-take-first",
AppID: "app",
Ports: []int{80, 443},
},
expected: "80",
},
{
desc: "indexed port taken",
applications: []marathon.Application{
{
ID: "specify-port-number",
Labels: &map[string]string{
"traefik.port": "443",
},
},
},
task: marathon.Task{
AppID: "specify-port-number",
Ports: []int{80, 443},
},
expected: "443",
},
{
applications: []marathon.Application{
{
ID: "specify-port-index",
ID: "app",
Labels: &map[string]string{
"traefik.portIndex": "1",
},
},
},
task: marathon.Task{
AppID: "specify-port-index",
AppID: "app",
Ports: []int{80, 443},
},
expected: "443",
}, {
},
{
desc: "illegal port index specified",
applications: []marathon.Application{
{
ID: "application-with-port",
ID: "app",
Labels: &map[string]string{
"traefik.portIndex": "foobar",
},
},
},
task: marathon.Task{
AppID: "app",
Ports: []int{80},
},
expected: "",
},
{
desc: "sub-zero port index specified",
applications: []marathon.Application{
{
ID: "app",
Labels: &map[string]string{
"traefik.portIndex": "-1",
},
},
},
task: marathon.Task{
AppID: "app",
Ports: []int{80},
},
expected: "",
},
{
desc: "too high port index specified",
applications: []marathon.Application{
{
ID: "app",
Labels: &map[string]string{
"traefik.portIndex": "42",
},
},
},
task: marathon.Task{
AppID: "app",
Ports: []int{80, 443},
},
expected: "",
},
{
desc: "task port preferred over application port",
applications: []marathon.Application{
{
ID: "app",
Ports: []int{9999},
Labels: &map[string]string{},
},
},
task: marathon.Task{
AppID: "application-with-port",
AppID: "app",
Ports: []int{7777},
},
expected: "7777",
@ -1074,14 +1107,18 @@ func TestMarathonGetPort(t *testing.T) {
}
for _, c := range cases {
actual := provider.getPort(c.task, c.applications)
if actual != c.expected {
t.Fatalf("expected %q, got %q", c.expected, actual)
}
c := c
t.Run(c.desc, func(t *testing.T) {
t.Parallel()
actual := provider.getPort(c.task, c.applications)
if actual != c.expected {
t.Errorf("got %q, want %q", c.expected, actual)
}
})
}
}
func TestMarathonGetWeigh(t *testing.T) {
func TestMarathonGetWeight(t *testing.T) {
provider := &Provider{}
applications := []struct {

6
testhelpers/helpers.go Normal file
View file

@ -0,0 +1,6 @@
package testhelpers
// Intp returns a pointer to the given integer value.
func Intp(i int) *int {
return &i
}