refactor(mesos) be testable.

This commit is contained in:
Fernandez Ludovic 2018-01-10 04:05:33 +01:00 committed by Traefiker
parent e9d2124885
commit 17137ba3e7
4 changed files with 466 additions and 302 deletions

View file

@ -6,56 +6,48 @@ import (
"strconv" "strconv"
"strings" "strings"
"text/template" "text/template"
"time"
"github.com/BurntSushi/ty/fun" "github.com/BurntSushi/ty/fun"
"github.com/containous/traefik/log" "github.com/containous/traefik/log"
"github.com/containous/traefik/provider" "github.com/containous/traefik/provider"
"github.com/containous/traefik/provider/label" "github.com/containous/traefik/provider/label"
"github.com/containous/traefik/types" "github.com/containous/traefik/types"
"github.com/mesosphere/mesos-dns/records"
"github.com/mesosphere/mesos-dns/records/state" "github.com/mesosphere/mesos-dns/records/state"
) )
func (p *Provider) buildConfiguration() *types.Configuration { func (p *Provider) buildConfiguration(tasks []state.Task) *types.Configuration {
var mesosFuncMap = template.FuncMap{ var mesosFuncMap = template.FuncMap{
"getBackend": getBackend, "getDomain": getFuncStringValue(label.TraefikDomain, p.Domain),
"getPort": p.getPort, "getID": getID,
"getHost": p.getHost,
"getWeight": getFuncApplicationStringValue(label.TraefikWeight, label.DefaultWeight), // Backend functions
"getDomain": getFuncStringValue(label.TraefikDomain, p.Domain), "getProtocol": getFuncApplicationStringValue(label.TraefikProtocol, label.DefaultProtocol),
"getProtocol": getFuncApplicationStringValue(label.TraefikProtocol, label.DefaultProtocol), "getPort": p.getPort,
"getHost": p.getHost,
"getWeight": getFuncApplicationStringValue(label.TraefikWeight, label.DefaultWeight),
"getBackend": getBackend,
// Frontend functions
"getPassHostHeader": getFuncStringValue(label.TraefikFrontendPassHostHeader, label.DefaultPassHostHeader), "getPassHostHeader": getFuncStringValue(label.TraefikFrontendPassHostHeader, label.DefaultPassHostHeader),
"getPriority": getFuncStringValue(label.TraefikFrontendPriority, label.DefaultFrontendPriority), "getPriority": getFuncStringValue(label.TraefikFrontendPriority, label.DefaultFrontendPriority),
"getEntryPoints": getFuncSliceStringValue(label.TraefikFrontendEntryPoints), "getEntryPoints": getFuncSliceStringValue(label.TraefikFrontendEntryPoints),
"getFrontendRule": p.getFrontendRule, "getFrontendRule": p.getFrontendRule,
"getFrontendBackend": getFrontendBackend, "getFrontendBackend": getFrontendBackend,
"getID": getID,
"getFrontEndName": getFrontEndName, "getFrontEndName": getFrontEndName,
} }
rg := records.NewRecordGenerator(time.Duration(p.StateTimeoutSecond) * time.Second)
st, err := rg.FindMaster(p.Masters...)
if err != nil {
log.Errorf("Failed to create a client for Mesos, error: %v", err)
return nil
}
tasks := taskRecords(st)
// filter tasks // filter tasks
filteredTasks := fun.Filter(func(task state.Task) bool { filteredTasks := fun.Filter(func(task state.Task) bool {
return taskFilter(task, p.ExposedByDefault) return taskFilter(task, p.ExposedByDefault)
}, tasks).([]state.Task) }, tasks).([]state.Task)
uniqueApps := make(map[string]state.Task)
for _, value := range filteredTasks {
if _, ok := uniqueApps[value.DiscoveryInfo.Name]; !ok {
uniqueApps[value.DiscoveryInfo.Name] = value
}
}
var filteredApps []state.Task var filteredApps []state.Task
for _, value := range uniqueApps { uniqueApps := make(map[string]struct{})
filteredApps = append(filteredApps, value) for _, task := range filteredTasks {
if _, ok := uniqueApps[task.DiscoveryInfo.Name]; !ok {
uniqueApps[task.DiscoveryInfo.Name] = struct{}{}
filteredApps = append(filteredApps, task)
}
} }
templateObjects := struct { templateObjects := struct {
@ -75,31 +67,12 @@ func (p *Provider) buildConfiguration() *types.Configuration {
return configuration return configuration
} }
func taskRecords(st state.State) []state.Task {
var tasks []state.Task
for _, f := range st.Frameworks {
for _, task := range f.Tasks {
for _, slave := range st.Slaves {
if task.SlaveID == slave.ID {
task.SlaveIP = slave.PID.Host
}
}
// only do running and discoverable tasks
if task.State == "TASK_RUNNING" {
tasks = append(tasks, task)
}
}
}
return tasks
}
func taskFilter(task state.Task, exposedByDefaultFlag bool) bool { func taskFilter(task state.Task, exposedByDefaultFlag bool) bool {
if len(task.DiscoveryInfo.Ports.DiscoveryPorts) == 0 { if len(task.DiscoveryInfo.Ports.DiscoveryPorts) == 0 {
log.Debugf("Filtering Mesos task without port %s", task.Name) log.Debugf("Filtering Mesos task without port %s", task.Name)
return false return false
} }
if !isEnabled(task, exposedByDefaultFlag) { if !isEnabled(task, exposedByDefaultFlag) {
log.Debugf("Filtering disabled Mesos task %s", task.DiscoveryInfo.Name) log.Debugf("Filtering disabled Mesos task %s", task.DiscoveryInfo.Name)
return false return false
@ -140,7 +113,7 @@ func taskFilter(task state.Task, exposedByDefaultFlag bool) bool {
} }
} }
//filter healthChecks // filter healthChecks
if task.Statuses != nil && len(task.Statuses) > 0 && task.Statuses[0].Healthy != nil && !*task.Statuses[0].Healthy { if task.Statuses != nil && len(task.Statuses) > 0 && task.Statuses[0].Healthy != nil && !*task.Statuses[0].Healthy {
log.Debugf("Filtering Mesos task %s with bad healthCheck", task.DiscoveryInfo.Name) log.Debugf("Filtering Mesos task %s with bad healthCheck", task.DiscoveryInfo.Name)
return false return false
@ -166,7 +139,7 @@ func getFrontendBackend(task state.Task) string {
if value := getStringValue(task, label.TraefikBackend, ""); len(value) > 0 { if value := getStringValue(task, label.TraefikBackend, ""); len(value) > 0 {
return value return value
} }
return "-" + provider.Normalize(task.DiscoveryInfo.Name) return provider.Normalize(task.DiscoveryInfo.Name)
} }
func getFrontEndName(task state.Task) string { func getFrontEndName(task state.Task) string {

View file

@ -1,8 +1,6 @@
package mesos package mesos
import ( import (
"reflect"
"strconv"
"testing" "testing"
"github.com/containous/traefik/provider/label" "github.com/containous/traefik/provider/label"
@ -10,238 +8,362 @@ import (
"github.com/mesos/mesos-go/upid" "github.com/mesos/mesos-go/upid"
"github.com/mesosphere/mesos-dns/records/state" "github.com/mesosphere/mesos-dns/records/state"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
// FIXME fill this test!!
func TestBuildConfiguration(t *testing.T) { func TestBuildConfiguration(t *testing.T) {
cases := []struct { p := &Provider{
applicationsError bool Domain: "docker.localhost",
tasksError bool ExposedByDefault: true,
mesosTask state.Task IPSources: "host",
expected bool }
exposedByDefault bool
expectedNil bool testCases := []struct {
desc string
tasks []state.Task
expectedFrontends map[string]*types.Frontend expectedFrontends map[string]*types.Frontend
expectedBackends map[string]*types.Backend expectedBackends map[string]*types.Backend
}{} }{
{
desc: "should return an empty configuration when no task",
tasks: []state.Task{},
expectedFrontends: map[string]*types.Frontend{},
expectedBackends: map[string]*types.Backend{},
},
{
desc: "2 applications with 2 tasks",
tasks: []state.Task{
// App 1
aTask("ID1",
withIP("10.10.10.10"),
withInfo("name1",
withPorts(withPort("TCP", 80, "WEB"))),
withStatus(withHealthy(true), withState("TASK_RUNNING")),
),
aTask("ID2",
withIP("10.10.10.11"),
withInfo("name1",
withPorts(withPort("TCP", 81, "WEB"))),
withStatus(withHealthy(true), withState("TASK_RUNNING")),
),
// App 2
aTask("ID3",
withIP("20.10.10.10"),
withInfo("name2",
withPorts(withPort("TCP", 80, "WEB"))),
withStatus(withHealthy(true), withState("TASK_RUNNING")),
),
aTask("ID4",
withIP("20.10.10.11"),
withInfo("name2",
withPorts(withPort("TCP", 81, "WEB"))),
withStatus(withHealthy(true), withState("TASK_RUNNING")),
),
},
expectedFrontends: map[string]*types.Frontend{
"frontend-ID1": {
Backend: "backend-name1",
EntryPoints: []string{},
PassHostHeader: true,
Routes: map[string]types.Route{
"route-host-ID1": {
Rule: "Host:name1.docker.localhost",
},
},
},
"frontend-ID3": {
Backend: "backend-name2",
EntryPoints: []string{},
PassHostHeader: true,
Routes: map[string]types.Route{
"route-host-ID3": {
Rule: "Host:name2.docker.localhost",
},
},
},
},
expectedBackends: map[string]*types.Backend{
"backend-name1": {
Servers: map[string]types.Server{
"server-ID1": {
URL: "http://10.10.10.10:80",
Weight: 0,
},
"server-ID2": {
URL: "http://10.10.10.11:81",
Weight: 0,
},
},
},
"backend-name2": {
Servers: map[string]types.Server{
"server-ID3": {
URL: "http://20.10.10.10:80",
Weight: 0,
},
"server-ID4": {
URL: "http://20.10.10.11:81",
Weight: 0,
},
},
},
},
},
{
desc: "with all labels",
tasks: []state.Task{
aTask("ID1",
withLabel(label.TraefikPort, "666"),
withLabel(label.TraefikProtocol, "https"),
withLabel(label.TraefikWeight, "12"),
for _, c := range cases { withLabel(label.TraefikBackend, "foobar"),
provider := &Provider{
Domain: "docker.localhost", withLabel(label.TraefikFrontendEntryPoints, "http,https"),
ExposedByDefault: true, withLabel(label.TraefikFrontendPassHostHeader, "true"),
} withLabel(label.TraefikFrontendPassTLSCert, "true"),
actualConfig := provider.buildConfiguration() withLabel(label.TraefikFrontendPriority, "666"),
if c.expectedNil { withLabel(label.TraefikFrontendRule, "Host:traefik.io"),
if actualConfig != nil {
t.Fatalf("Should have been nil, got %v", actualConfig) withIP("10.10.10.10"),
} withInfo("name1", withPorts(
} else { withPortTCP(80, "n"),
// Compare backends withPortTCP(666, "n"))),
if !reflect.DeepEqual(actualConfig.Backends, c.expectedBackends) { withStatus(withHealthy(true), withState("TASK_RUNNING")),
t.Fatalf("Expected %#v, got %#v", c.expectedBackends, actualConfig.Backends) ),
} },
if !reflect.DeepEqual(actualConfig.Frontends, c.expectedFrontends) { expectedFrontends: map[string]*types.Frontend{
t.Fatalf("Expected %#v, got %#v", c.expectedFrontends, actualConfig.Frontends) "frontend-ID1": {
} EntryPoints: []string{
} "http",
"https",
},
Backend: "backend-foobar",
Routes: map[string]types.Route{
"route-host-ID1": {
Rule: "Host:traefik.io",
},
},
PassHostHeader: true,
Priority: 666,
},
},
expectedBackends: map[string]*types.Backend{
"backend-foobar": {
Servers: map[string]types.Server{
"server-ID1": {
URL: "https://10.10.10.10:666",
Weight: 12,
},
},
},
},
},
}
for _, test := range testCases {
test := test
t.Run(test.desc, func(t *testing.T) {
actualConfig := p.buildConfiguration(test.tasks)
require.NotNil(t, actualConfig)
assert.Equal(t, test.expectedBackends, actualConfig.Backends)
assert.Equal(t, test.expectedFrontends, actualConfig.Frontends)
})
} }
} }
func TestTaskFilter(t *testing.T) { func TestTaskFilter(t *testing.T) {
testCases := []struct { testCases := []struct {
desc string
mesosTask state.Task mesosTask state.Task
expected bool
exposedByDefault bool exposedByDefault bool
expected bool
}{ }{
{ {
desc: "no task",
mesosTask: state.Task{}, mesosTask: state.Task{},
exposedByDefault: true,
expected: false, expected: false,
exposedByDefault: true,
}, },
{ {
mesosTask: task(statuses(status(setState("TASK_RUNNING")))), desc: "task not healthy",
mesosTask: aTask("test", withStatus(withState("TASK_RUNNING"))),
exposedByDefault: true,
expected: false, expected: false,
exposedByDefault: true,
}, },
{ {
mesosTask: task( desc: "exposedByDefault false and traefik.enable false",
statuses( mesosTask: aTask("test",
status( withDefaultStatus(),
setState("TASK_RUNNING"), withLabel(label.TraefikEnable, "false"),
setHealthy(true))), withInfo("test", withPorts(withPortTCP(80, "WEB"))),
setLabels(label.TraefikEnable, "false"),
discovery(setDiscoveryPort("TCP", 80, "WEB")),
), ),
expected: false, // because label traefik.enable = false
exposedByDefault: false, exposedByDefault: false,
expected: false,
}, },
{ {
mesosTask: task( desc: "traefik.enable = true",
statuses( mesosTask: aTask("test",
status( withDefaultStatus(),
setState("TASK_RUNNING"), withLabel(label.TraefikEnable, "true"),
setHealthy(true))), withInfo("test", withPorts(withPortTCP(80, "WEB"))),
setLabels(label.TraefikEnable, "true"),
discovery(setDiscoveryPort("TCP", 80, "WEB")),
), ),
expected: true,
exposedByDefault: false, exposedByDefault: false,
},
{
mesosTask: task(
statuses(
status(
setState("TASK_RUNNING"),
setHealthy(true))),
setLabels(label.TraefikEnable, "true"),
discovery(setDiscoveryPort("TCP", 80, "WEB")),
),
expected: true, expected: true,
exposedByDefault: true,
}, },
{ {
mesosTask: task( desc: "exposedByDefault true and traefik.enable true",
statuses( mesosTask: aTask("test",
status( withDefaultStatus(),
setState("TASK_RUNNING"), withLabel(label.TraefikEnable, "true"),
setHealthy(true))), withInfo("test", withPorts(withPortTCP(80, "WEB"))),
setLabels(label.TraefikEnable, "false"),
discovery(setDiscoveryPort("TCP", 80, "WEB")),
), ),
expected: false, // because label traefik.enable = false (even wherek exposedByDefault = true)
exposedByDefault: true, exposedByDefault: true,
},
{
mesosTask: task(
statuses(
status(
setState("TASK_RUNNING"),
setHealthy(true))),
setLabels(label.TraefikEnable, "true",
label.TraefikPortIndex, "1",
label.TraefikPort, "80"),
discovery(setDiscoveryPort("TCP", 80, "WEB")),
),
expected: false, // traefik.portIndex & traefik.port cannot be set both
exposedByDefault: true,
},
{
mesosTask: task(
statuses(
status(
setState("TASK_RUNNING"),
setHealthy(true))),
setLabels(label.TraefikEnable, "true",
label.TraefikPortIndex, "1"),
discovery(setDiscoveryPorts("TCP", 80, "WEB HTTP", "TCP", 443, "WEB HTTPS")),
),
expected: true, expected: true,
exposedByDefault: true,
}, },
{ {
mesosTask: task( desc: "exposedByDefault true and traefik.enable false",
statuses( mesosTask: aTask("test",
status( withDefaultStatus(),
setState("TASK_RUNNING"), withLabel(label.TraefikEnable, "false"),
setHealthy(true))), withInfo("test", withPorts(withPortTCP(80, "WEB"))),
setLabels(label.TraefikEnable, "true"),
discovery(setDiscoveryPorts("TCP", 80, "WEB HTTP", "TCP", 443, "WEB HTTPS")),
), ),
expected: true, // Default to first index
exposedByDefault: true, exposedByDefault: true,
expected: false,
}, },
{ {
mesosTask: task( desc: "traefik.portIndex and traefik.port both set",
statuses( mesosTask: aTask("test",
status( withDefaultStatus(),
setState("TASK_RUNNING"), withLabel(label.TraefikEnable, "true"),
setHealthy(true))), withLabel(label.TraefikPortIndex, "1"),
setLabels(label.TraefikEnable, "true", withLabel(label.TraefikEnable, "80"),
label.TraefikPortIndex, "1"), withInfo("test", withPorts(withPortTCP(80, "WEB"))),
discovery(setDiscoveryPort("TCP", 80, "WEB")),
), ),
expected: false, // traefik.portIndex and discoveryPorts don't correspond
exposedByDefault: true, exposedByDefault: true,
}, { expected: false,
mesosTask: task( },
statuses( {
status( desc: "valid traefik.portIndex",
setState("TASK_RUNNING"), mesosTask: aTask("test",
setHealthy(true))), withDefaultStatus(),
setLabels(label.TraefikEnable, "true", withLabel(label.TraefikEnable, "true"),
label.TraefikPortIndex, "0"), withLabel(label.TraefikPortIndex, "1"),
discovery(setDiscoveryPort("TCP", 80, "WEB")), withInfo("test", withPorts(
withPortTCP(80, "WEB"),
withPortTCP(443, "WEB HTTPS"),
)),
), ),
expected: true, // traefik.portIndex and discoveryPorts correspond
exposedByDefault: true, exposedByDefault: true,
}, { expected: true,
mesosTask: task( },
statuses( {
status( desc: "default to first port index",
setState("TASK_RUNNING"), mesosTask: aTask("test",
setHealthy(true))), withDefaultStatus(),
setLabels(label.TraefikEnable, "true", withLabel(label.TraefikEnable, "true"),
label.TraefikPort, "TRAEFIK"), withInfo("test", withPorts(
discovery(setDiscoveryPort("TCP", 80, "WEB")), withPortTCP(80, "WEB"),
withPortTCP(443, "WEB HTTPS"),
)),
), ),
expected: false, // traefik.port is not an integer
exposedByDefault: true, exposedByDefault: true,
}, { expected: true,
mesosTask: task( },
statuses( {
status( desc: "traefik.portIndex and discoveryPorts don't correspond",
setState("TASK_RUNNING"), mesosTask: aTask("test",
setHealthy(true))), withDefaultStatus(),
setLabels(label.TraefikEnable, "true", withLabel(label.TraefikEnable, "true"),
label.TraefikPort, "443"), withLabel(label.TraefikPortIndex, "1"),
discovery(setDiscoveryPort("TCP", 80, "WEB")), withInfo("test", withPorts(withPortTCP(80, "WEB"))),
), ),
expected: false, // traefik.port is not the same as discovery.port
exposedByDefault: true, exposedByDefault: true,
}, { expected: false,
mesosTask: task( },
statuses( {
status( desc: "traefik.portIndex and discoveryPorts correspond",
setState("TASK_RUNNING"), mesosTask: aTask("test",
setHealthy(true))), withDefaultStatus(),
setLabels(label.TraefikEnable, "true", withLabel(label.TraefikEnable, "true"),
label.TraefikPort, "80"), withLabel(label.TraefikPortIndex, "0"),
discovery(setDiscoveryPort("TCP", 80, "WEB")), withInfo("test", withPorts(withPortTCP(80, "WEB"))),
), ),
expected: true, // traefik.port is the same as discovery.port
exposedByDefault: true, exposedByDefault: true,
}, { expected: true,
mesosTask: task( },
statuses( {
status( desc: "traefik.port is not an integer",
setState("TASK_RUNNING"))), mesosTask: aTask("test",
setLabels(label.TraefikEnable, "true", withDefaultStatus(),
label.TraefikPort, "80"), withLabel(label.TraefikEnable, "true"),
discovery(setDiscoveryPort("TCP", 80, "WEB")), withLabel(label.TraefikPort, "TRAEFIK"),
withInfo("test", withPorts(withPortTCP(80, "WEB"))),
), ),
expected: true, // No healthCheck
exposedByDefault: true, exposedByDefault: true,
}, { expected: false,
mesosTask: task( },
statuses( {
status( desc: "traefik.port is not the same as discovery.port",
setState("TASK_RUNNING"), mesosTask: aTask("test",
setHealthy(false))), withDefaultStatus(),
setLabels(label.TraefikEnable, "true", withLabel(label.TraefikEnable, "true"),
label.TraefikPort, "80"), withLabel(label.TraefikPort, "443"),
discovery(setDiscoveryPort("TCP", 80, "WEB")), withInfo("test", withPorts(withPortTCP(80, "WEB"))),
), ),
expected: false, // HealthCheck at false
exposedByDefault: true, exposedByDefault: true,
expected: false,
},
{
desc: "traefik.port is the same as discovery.port",
mesosTask: aTask("test",
withDefaultStatus(),
withLabel(label.TraefikEnable, "true"),
withLabel(label.TraefikPort, "80"),
withInfo("test", withPorts(withPortTCP(80, "WEB"))),
),
exposedByDefault: true,
expected: true,
},
{
desc: "healthy nil",
mesosTask: aTask("test",
withStatus(
withState("TASK_RUNNING"),
),
withLabel(label.TraefikEnable, "true"),
withLabel(label.TraefikPort, "80"),
withInfo("test", withPorts(withPortTCP(80, "WEB"))),
),
exposedByDefault: true,
expected: true,
},
{
desc: "healthy false",
mesosTask: aTask("test",
withStatus(
withState("TASK_RUNNING"),
withHealthy(false),
),
withLabel(label.TraefikEnable, "true"),
withLabel(label.TraefikPort, "80"),
withInfo("test", withPorts(withPortTCP(80, "WEB"))),
),
exposedByDefault: true,
expected: false,
}, },
} }
for index, test := range testCases { for _, test := range testCases {
t.Run(strconv.Itoa(index), func(t *testing.T) { test := test
t.Run(test.desc, func(t *testing.T) {
t.Parallel() t.Parallel()
actual := taskFilter(test.mesosTask, test.exposedByDefault) actual := taskFilter(test.mesosTask, test.exposedByDefault)
if actual != test.expected { ok := assert.Equal(t, test.expected, actual)
if !ok {
t.Logf("Statuses : %v", test.mesosTask.Statuses) t.Logf("Statuses : %v", test.mesosTask.Statuses)
t.Logf("Label : %v", test.mesosTask.Labels) t.Logf("Label : %v", test.mesosTask.Labels)
t.Logf("DiscoveryInfo : %v", test.mesosTask.DiscoveryInfo) t.Logf("DiscoveryInfo : %v", test.mesosTask.DiscoveryInfo)
@ -303,7 +425,7 @@ func TestGetSubDomain(t *testing.T) {
for _, test := range testCases { for _, test := range testCases {
test := test test := test
t.Run("", func(t *testing.T) { t.Run(test.path, func(t *testing.T) {
t.Parallel() t.Parallel()
actual := test.provider.getSubDomain(test.path) actual := test.provider.getSubDomain(test.path)

View file

@ -12,6 +12,9 @@ import (
"github.com/containous/traefik/safe" "github.com/containous/traefik/safe"
"github.com/containous/traefik/types" "github.com/containous/traefik/types"
"github.com/mesos/mesos-go/detector" "github.com/mesos/mesos-go/detector"
"github.com/mesosphere/mesos-dns/records"
"github.com/mesosphere/mesos-dns/records/state"
// Register mesos zoo the detector // Register mesos zoo the detector
_ "github.com/mesos/mesos-go/detector/zoo" _ "github.com/mesos/mesos-go/detector/zoo"
"github.com/mesosphere/mesos-dns/detect" "github.com/mesosphere/mesos-dns/detect"
@ -76,7 +79,8 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
for { for {
select { select {
case <-reload.C: case <-reload.C:
configuration := p.buildConfiguration() tasks := p.getTasks()
configuration := p.buildConfiguration(tasks)
if configuration != nil { if configuration != nil {
configurationChan <- types.ConfigMessage{ configurationChan <- types.ConfigMessage{
ProviderName: "mesos", ProviderName: "mesos",
@ -92,7 +96,8 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
} }
log.Debugf("new masters detected: %v", masters) log.Debugf("new masters detected: %v", masters)
p.Masters = masters p.Masters = masters
configuration := p.buildConfiguration() tasks := p.getTasks()
configuration := p.buildConfiguration(tasks)
if configuration != nil { if configuration != nil {
configurationChan <- types.ConfigMessage{ configurationChan <- types.ConfigMessage{
ProviderName: "mesos", ProviderName: "mesos",
@ -129,3 +134,35 @@ func detectMasters(zk string, masters []string) <-chan []string {
} }
return changed return changed
} }
func (p *Provider) getTasks() []state.Task {
rg := records.NewRecordGenerator(time.Duration(p.StateTimeoutSecond) * time.Second)
st, err := rg.FindMaster(p.Masters...)
if err != nil {
log.Errorf("Failed to create a client for Mesos, error: %v", err)
return nil
}
return taskRecords(st)
}
func taskRecords(st state.State) []state.Task {
var tasks []state.Task
for _, f := range st.Frameworks {
for _, task := range f.Tasks {
for _, slave := range st.Slaves {
if task.SlaveID == slave.ID {
task.SlaveIP = slave.PID.Host
}
}
// only do running and discoverable tasks
if task.State == "TASK_RUNNING" {
tasks = append(tasks, task)
}
}
}
return tasks
}

View file

@ -1,113 +1,145 @@
package mesos package mesos
import ( import (
"github.com/containous/traefik/log" "testing"
"github.com/mesosphere/mesos-dns/records/state" "github.com/mesosphere/mesos-dns/records/state"
"github.com/stretchr/testify/assert"
) )
// test helpers // test helpers
type ( func TestBuilder(t *testing.T) {
taskOpt func(*state.Task) result := aTask("ID1",
statusOpt func(*state.Status) withIP("10.10.10.10"),
) withLabel("foo", "bar"),
withLabel("fii", "bar"),
withLabel("fuu", "bar"),
withInfo("name1",
withPorts(withPort("TCP", 80, "p"),
withPortTCP(81, "n"))),
withStatus(withHealthy(true), withState("a")))
func task(opts ...taskOpt) state.Task { expected := state.Task{
var t state.Task FrameworkID: "",
for _, opt := range opts { ID: "ID1",
opt(&t) SlaveIP: "10.10.10.10",
} Name: "",
return t SlaveID: "",
State: "",
Statuses: []state.Status{{
State: "a",
Healthy: Bool(true),
ContainerStatus: state.ContainerStatus{},
}},
DiscoveryInfo: state.DiscoveryInfo{
Name: "name1",
Labels: struct {
Labels []state.Label "json:\"labels\""
}{},
Ports: state.Ports{DiscoveryPorts: []state.DiscoveryPort{
{Protocol: "TCP", Number: 80, Name: "p"},
{Protocol: "TCP", Number: 81, Name: "n"}}}},
Labels: []state.Label{
{Key: "foo", Value: "bar"},
{Key: "fii", Value: "bar"},
{Key: "fuu", Value: "bar"}}}
assert.Equal(t, expected, result)
} }
func statuses(st ...state.Status) taskOpt { func aTask(id string, ops ...func(*state.Task)) state.Task {
return func(t *state.Task) { ts := &state.Task{ID: id}
t.Statuses = append(t.Statuses, st...) for _, op := range ops {
op(ts)
}
return *ts
}
func withIP(ip string) func(*state.Task) {
return func(task *state.Task) {
task.SlaveIP = ip
} }
} }
func discovery(dp state.DiscoveryInfo) taskOpt { func withInfo(name string, ops ...func(*state.DiscoveryInfo)) func(*state.Task) {
return func(t *state.Task) { return func(task *state.Task) {
t.DiscoveryInfo = dp info := &state.DiscoveryInfo{Name: name}
for _, op := range ops {
op(info)
}
task.DiscoveryInfo = *info
} }
} }
func setLabels(kvs ...string) taskOpt { func withPorts(ops ...func(port *state.DiscoveryPort)) func(*state.DiscoveryInfo) {
return func(t *state.Task) { return func(info *state.DiscoveryInfo) {
if len(kvs)%2 != 0 { var ports []state.DiscoveryPort
panic("odd number") for _, op := range ops {
pt := &state.DiscoveryPort{}
op(pt)
ports = append(ports, *pt)
} }
for i := 0; i < len(kvs); i += 2 { info.Ports = state.Ports{
var label = state.Label{Key: kvs[i], Value: kvs[i+1]} DiscoveryPorts: ports,
log.Debugf("Label1.1 : %v", label)
t.Labels = append(t.Labels, label)
log.Debugf("Label1.2 : %v", t.Labels)
} }
} }
} }
func status(opts ...statusOpt) state.Status { func withPort(proto string, port int, name string) func(port *state.DiscoveryPort) {
var s state.Status return func(p *state.DiscoveryPort) {
for _, opt := range opts { p.Protocol = proto
opt(&s) p.Number = port
} p.Name = name
return s
}
func setDiscoveryPort(proto string, port int, name string) state.DiscoveryInfo {
dp := state.DiscoveryPort{
Protocol: proto,
Number: port,
Name: name,
}
discoveryPorts := []state.DiscoveryPort{dp}
ports := state.Ports{
DiscoveryPorts: discoveryPorts,
}
return state.DiscoveryInfo{
Ports: ports,
} }
} }
func setDiscoveryPorts(proto1 string, port1 int, name1 string, proto2 string, port2 int, name2 string) state.DiscoveryInfo { func withPortTCP(port int, name string) func(port *state.DiscoveryPort) {
return withPort("TCP", port, name)
}
dp1 := state.DiscoveryPort{ func withStatus(ops ...func(*state.Status)) func(*state.Task) {
Protocol: proto1, return func(task *state.Task) {
Number: port1, st := &state.Status{}
Name: name1, for _, op := range ops {
op(st)
}
task.Statuses = append(task.Statuses, *st)
} }
}
dp2 := state.DiscoveryPort{ func withDefaultStatus(ops ...func(*state.Status)) func(*state.Task) {
Protocol: proto2, return func(task *state.Task) {
Number: port2, for _, op := range ops {
Name: name2, st := &state.Status{
} State: "TASK_RUNNING",
Healthy: Bool(true),
discoveryPorts := []state.DiscoveryPort{dp1, dp2} }
op(st)
ports := state.Ports{ task.Statuses = append(task.Statuses, *st)
DiscoveryPorts: discoveryPorts, }
}
return state.DiscoveryInfo{
Ports: ports,
} }
} }
func setState(st string) statusOpt { func withHealthy(st bool) func(*state.Status) {
return func(s *state.Status) { return func(status *state.Status) {
s.State = st status.Healthy = Bool(st)
} }
} }
func setHealthy(b bool) statusOpt { func withState(st string) func(*state.Status) {
return func(s *state.Status) { return func(status *state.Status) {
s.Healthy = &b status.State = st
} }
} }
func withLabel(key, value string) func(*state.Task) {
return func(task *state.Task) {
lbl := state.Label{Key: key, Value: value}
task.Labels = append(task.Labels, lbl)
}
}
func Bool(v bool) *bool {
return &v
}