[marathon] Use single API call to fetch Marathon resources.
Change Marathon provider to make just one API call instead of two per configuration update by means of specifying embedded resources, which enable retrieving multiple response types from the API at once. Apart from the obvious savings in API calls, we primarily gain a consistent view on both applications and tasks that allows us to drop a lot of correlation logic. Additionally, it will serve as the basis for the introduction of readiness checks which require application/task consistency for correct leverage on the proxy end. Additional changes: marathon.go: - Filter on tasks now embedded inside the applications. - Reduce/simplify signature on multiple template functions as we do not need to check for proper application/task correlation anymore. - Remove getFrontendBackend in favor of just getBackend. - Move filtering on enabled/exposed applications from `taskFilter` to `applicationFilter`. (The task filter just reached out to the applications anyway, so it never made sense to locate it with the tasks where the filter was called once for every task even though the result would never change.) - Remove duplicate constraints filter in tasks, where it neither made sense to keep as it operates on the application level only. - Add context to rendering error. marathon_test.go: - Simplify and reduce numerous tests. - Convert tests with high number of cases into parallelized sub-tests. - Improve readability/structure for several tests. - Add missing test for enabled/exposed applications. - Simplify the mocked Marathon server. marathon.tmpl: - Update application/task iteration. - Replace `getFrontendBackend` by `getBackend`.
This commit is contained in:
parent
58ffea6627
commit
779eeba650
3 changed files with 438 additions and 875 deletions
|
@ -26,6 +26,7 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
traceMaxScanTokenSize = 1024 * 1024
|
traceMaxScanTokenSize = 1024 * 1024
|
||||||
|
taskStateRunning = "TASK_RUNNING"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ provider.Provider = (*Provider)(nil)
|
var _ provider.Provider = (*Provider)(nil)
|
||||||
|
@ -54,7 +55,6 @@ type Basic struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type lightMarathonClient interface {
|
type lightMarathonClient interface {
|
||||||
AllTasks(v url.Values) (*marathon.Tasks, error)
|
|
||||||
Applications(url.Values) (*marathon.Applications, error)
|
Applications(url.Values) (*marathon.Applications, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,7 +152,6 @@ func (p *Provider) loadMarathonConfig() *types.Configuration {
|
||||||
"getPriority": p.getPriority,
|
"getPriority": p.getPriority,
|
||||||
"getEntryPoints": p.getEntryPoints,
|
"getEntryPoints": p.getEntryPoints,
|
||||||
"getFrontendRule": p.getFrontendRule,
|
"getFrontendRule": p.getFrontendRule,
|
||||||
"getFrontendBackend": p.getFrontendBackend,
|
|
||||||
"hasCircuitBreakerLabels": p.hasCircuitBreakerLabels,
|
"hasCircuitBreakerLabels": p.hasCircuitBreakerLabels,
|
||||||
"hasLoadBalancerLabels": p.hasLoadBalancerLabels,
|
"hasLoadBalancerLabels": p.hasLoadBalancerLabels,
|
||||||
"hasMaxConnLabels": p.hasMaxConnLabels,
|
"hasMaxConnLabels": p.hasMaxConnLabels,
|
||||||
|
@ -167,52 +166,67 @@ func (p *Provider) loadMarathonConfig() *types.Configuration {
|
||||||
"getBasicAuth": p.getBasicAuth,
|
"getBasicAuth": p.getBasicAuth,
|
||||||
}
|
}
|
||||||
|
|
||||||
applications, err := p.marathonClient.Applications(nil)
|
v := url.Values{}
|
||||||
|
v.Add("embed", "apps.tasks")
|
||||||
|
applications, err := p.marathonClient.Applications(v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Failed to retrieve applications from Marathon, error: %s", err)
|
log.Errorf("Failed to retrieve Marathon applications: %s", err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks, err := p.marathonClient.AllTasks(&marathon.AllTasksOpts{Status: "running"})
|
filteredApps := fun.Filter(p.applicationFilter, applications.Apps).([]marathon.Application)
|
||||||
if err != nil {
|
for _, app := range filteredApps {
|
||||||
log.Errorf("Failed to retrieve task from Marathon, error: %s", err)
|
app.Tasks = fun.Filter(func(task *marathon.Task) bool {
|
||||||
return nil
|
return p.taskFilter(*task, app)
|
||||||
|
}, app.Tasks).([]*marathon.Task)
|
||||||
}
|
}
|
||||||
|
|
||||||
//filter tasks
|
|
||||||
filteredTasks := fun.Filter(func(task marathon.Task) bool {
|
|
||||||
return p.taskFilter(task, applications, p.ExposedByDefault)
|
|
||||||
}, tasks.Tasks).([]marathon.Task)
|
|
||||||
|
|
||||||
//filter apps
|
|
||||||
filteredApps := fun.Filter(func(app marathon.Application) bool {
|
|
||||||
return p.applicationFilter(app, filteredTasks)
|
|
||||||
}, applications.Apps).([]marathon.Application)
|
|
||||||
|
|
||||||
templateObjects := struct {
|
templateObjects := struct {
|
||||||
Applications []marathon.Application
|
Applications []marathon.Application
|
||||||
Tasks []marathon.Task
|
|
||||||
Domain string
|
Domain string
|
||||||
}{
|
}{
|
||||||
filteredApps,
|
filteredApps,
|
||||||
filteredTasks,
|
|
||||||
p.Domain,
|
p.Domain,
|
||||||
}
|
}
|
||||||
|
|
||||||
configuration, err := p.GetConfiguration("templates/marathon.tmpl", MarathonFuncMap, templateObjects)
|
configuration, err := p.GetConfiguration("templates/marathon.tmpl", MarathonFuncMap, templateObjects)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Errorf("failed to render Marathon configuration template: %s", err)
|
||||||
}
|
}
|
||||||
return configuration
|
return configuration
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Provider) taskFilter(task marathon.Task, applications *marathon.Applications, exposedByDefaultFlag bool) bool {
|
func (p *Provider) applicationFilter(app marathon.Application) bool {
|
||||||
application, err := getApplication(task, applications.Apps)
|
// Filter disabled application.
|
||||||
if err != nil {
|
if !isApplicationEnabled(app, p.ExposedByDefault) {
|
||||||
log.Errorf("Unable to get Marathon application %s for task %s", task.AppID, task.ID)
|
log.Debugf("Filtering disabled Marathon application %s", app.ID)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if _, err = processPorts(application, task); err != nil {
|
|
||||||
|
// Filter by constraints.
|
||||||
|
label, _ := p.getLabel(app, types.LabelTags)
|
||||||
|
constraintTags := strings.Split(label, ",")
|
||||||
|
if p.MarathonLBCompatibility {
|
||||||
|
if label, ok := p.getLabel(app, "HAPROXY_GROUP"); ok {
|
||||||
|
constraintTags = append(constraintTags, label)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ok, failingConstraint := p.MatchConstraints(constraintTags); !ok {
|
||||||
|
if failingConstraint != nil {
|
||||||
|
log.Debugf("Filtering Marathon application %v pruned by '%v' constraint", app.ID, failingConstraint.String())
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Provider) taskFilter(task marathon.Task, application marathon.Application) bool {
|
||||||
|
if task.State != taskStateRunning {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := processPorts(application, task); err != nil {
|
||||||
log.Errorf("Filtering Marathon task %s from application %s without port: %s", task.ID, application.ID, err)
|
log.Errorf("Filtering Marathon task %s from application %s without port: %s", task.ID, application.ID, err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -225,27 +239,6 @@ func (p *Provider) taskFilter(task marathon.Task, applications *marathon.Applica
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Filter by constraints.
|
|
||||||
label, _ := p.getLabel(application, types.LabelTags)
|
|
||||||
constraintTags := strings.Split(label, ",")
|
|
||||||
if p.MarathonLBCompatibility {
|
|
||||||
if label, ok := p.getLabel(application, "HAPROXY_GROUP"); ok {
|
|
||||||
constraintTags = append(constraintTags, label)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if ok, failingConstraint := p.MatchConstraints(constraintTags); !ok {
|
|
||||||
if failingConstraint != nil {
|
|
||||||
log.Debugf("Filtering Marathon task %s from application %s pruned by '%v' constraint", task.ID, application.ID, failingConstraint.String())
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Filter disabled application.
|
|
||||||
if !isApplicationEnabled(application, exposedByDefaultFlag) {
|
|
||||||
log.Debugf("Filtering disabled Marathon task %s from application %s", task.ID, application.ID)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Filter task with existing, bad health check results.
|
// Filter task with existing, bad health check results.
|
||||||
if application.HasHealthChecks() {
|
if application.HasHealthChecks() {
|
||||||
if task.HasHealthCheckResults() {
|
if task.HasHealthCheckResults() {
|
||||||
|
@ -261,26 +254,6 @@ func (p *Provider) taskFilter(task marathon.Task, applications *marathon.Applica
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Provider) applicationFilter(app marathon.Application, filteredTasks []marathon.Task) bool {
|
|
||||||
label, _ := p.getLabel(app, types.LabelTags)
|
|
||||||
constraintTags := strings.Split(label, ",")
|
|
||||||
if p.MarathonLBCompatibility {
|
|
||||||
if label, ok := p.getLabel(app, "HAPROXY_GROUP"); ok {
|
|
||||||
constraintTags = append(constraintTags, label)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if ok, failingConstraint := p.MatchConstraints(constraintTags); !ok {
|
|
||||||
if failingConstraint != nil {
|
|
||||||
log.Debugf("Application %v pruned by '%v' constraint", app.ID, failingConstraint.String())
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return fun.Exists(func(task marathon.Task) bool {
|
|
||||||
return task.AppID == app.ID
|
|
||||||
}, filteredTasks)
|
|
||||||
}
|
|
||||||
|
|
||||||
func getApplication(task marathon.Task, apps []marathon.Application) (marathon.Application, error) {
|
func getApplication(task marathon.Task, apps []marathon.Application) (marathon.Application, error) {
|
||||||
for _, application := range apps {
|
for _, application := range apps {
|
||||||
if application.ID == task.AppID {
|
if application.ID == task.AppID {
|
||||||
|
@ -303,12 +276,7 @@ func (p *Provider) getLabel(application marathon.Application, label string) (str
|
||||||
return "", false
|
return "", false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Provider) getPort(task marathon.Task, applications []marathon.Application) string {
|
func (p *Provider) getPort(task marathon.Task, application marathon.Application) string {
|
||||||
application, err := getApplication(task, applications)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Unable to get Marathon application %s for task %s", application.ID, task.ID)
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
port, err := processPorts(application, task)
|
port, err := processPorts(application, task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Unable to process ports for Marathon application %s and task %s: %s", application.ID, task.ID, err)
|
log.Errorf("Unable to process ports for Marathon application %s and task %s: %s", application.ID, task.ID, err)
|
||||||
|
@ -318,12 +286,7 @@ func (p *Provider) getPort(task marathon.Task, applications []marathon.Applicati
|
||||||
return strconv.Itoa(port)
|
return strconv.Itoa(port)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Provider) getWeight(task marathon.Task, applications []marathon.Application) string {
|
func (p *Provider) getWeight(application marathon.Application) string {
|
||||||
application, errApp := getApplication(task, applications)
|
|
||||||
if errApp != nil {
|
|
||||||
log.Errorf("Unable to get marathon application from task %s", task.AppID)
|
|
||||||
return "0"
|
|
||||||
}
|
|
||||||
if label, ok := p.getLabel(application, types.LabelWeight); ok {
|
if label, ok := p.getLabel(application, types.LabelWeight); ok {
|
||||||
return label
|
return label
|
||||||
}
|
}
|
||||||
|
@ -337,12 +300,7 @@ func (p *Provider) getDomain(application marathon.Application) string {
|
||||||
return p.Domain
|
return p.Domain
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Provider) getProtocol(task marathon.Task, applications []marathon.Application) string {
|
func (p *Provider) getProtocol(application marathon.Application) string {
|
||||||
application, errApp := getApplication(task, applications)
|
|
||||||
if errApp != nil {
|
|
||||||
log.Errorf("Unable to get marathon application from task %s", task.AppID)
|
|
||||||
return "http"
|
|
||||||
}
|
|
||||||
if label, ok := p.getLabel(application, types.LabelProtocol); ok {
|
if label, ok := p.getLabel(application, types.LabelProtocol); ok {
|
||||||
return label
|
return label
|
||||||
}
|
}
|
||||||
|
@ -391,16 +349,7 @@ func (p *Provider) getFrontendRule(application marathon.Application) string {
|
||||||
return "Host:" + p.getSubDomain(application.ID) + "." + p.Domain
|
return "Host:" + p.getSubDomain(application.ID) + "." + p.Domain
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Provider) getBackend(task marathon.Task, applications []marathon.Application) string {
|
func (p *Provider) getBackend(application marathon.Application) string {
|
||||||
application, errApp := getApplication(task, applications)
|
|
||||||
if errApp != nil {
|
|
||||||
log.Errorf("Unable to get marathon application from task %s", task.AppID)
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
return p.getFrontendBackend(application)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Provider) getFrontendBackend(application marathon.Application) string {
|
|
||||||
if label, ok := p.getLabel(application, types.LabelBackend); ok {
|
if label, ok := p.getLabel(application, types.LabelBackend); ok {
|
||||||
return label
|
return label
|
||||||
}
|
}
|
||||||
|
@ -552,13 +501,7 @@ func retrieveAvailablePorts(application marathon.Application, task marathon.Task
|
||||||
return []int{}
|
return []int{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Provider) getBackendServer(task marathon.Task, applications []marathon.Application) string {
|
func (p *Provider) getBackendServer(task marathon.Task, application marathon.Application) string {
|
||||||
application, err := getApplication(task, applications)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("Unable to get marathon application from task %s", task.AppID)
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
numTaskIPAddresses := len(task.IPAddresses)
|
numTaskIPAddresses := len(task.IPAddresses)
|
||||||
switch {
|
switch {
|
||||||
case application.IPAddressPerTask == nil || p.ForceTaskHostname:
|
case application.IPAddressPerTask == nil || p.ForceTaskHostname:
|
||||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -1,35 +1,38 @@
|
||||||
{{$apps := .Applications}}
|
{{$apps := .Applications}}
|
||||||
[backends]{{range .Tasks}}
|
|
||||||
[backends."backend{{getBackend . $apps}}".servers."server-{{.ID | replace "." "-"}}"]
|
{{range $app := $apps}}
|
||||||
url = "{{getProtocol . $apps}}://{{getBackendServer . $apps}}:{{getPort . $apps}}"
|
{{range $app.Tasks}}
|
||||||
weight = {{getWeight . $apps}}
|
[backends."backend{{getBackend $app}}".servers."server-{{.ID | replace "." "-"}}"]
|
||||||
|
url = "{{getProtocol $app}}://{{getBackendServer . $app}}:{{getPort . $app}}"
|
||||||
|
weight = {{getWeight $app}}
|
||||||
|
{{end}}
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
{{range .Applications}}
|
{{range $apps}}
|
||||||
{{ if hasMaxConnLabels . }}
|
{{ if hasMaxConnLabels . }}
|
||||||
[backends."backend{{getFrontendBackend . }}".maxconn]
|
[backends."backend{{getBackend . }}".maxconn]
|
||||||
amount = {{getMaxConnAmount . }}
|
amount = {{getMaxConnAmount . }}
|
||||||
extractorfunc = "{{getMaxConnExtractorFunc . }}"
|
extractorfunc = "{{getMaxConnExtractorFunc . }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
{{ if hasLoadBalancerLabels . }}
|
{{ if hasLoadBalancerLabels . }}
|
||||||
[backends."backend{{getFrontendBackend . }}".loadbalancer]
|
[backends."backend{{getBackend . }}".loadbalancer]
|
||||||
method = "{{getLoadBalancerMethod . }}"
|
method = "{{getLoadBalancerMethod . }}"
|
||||||
sticky = {{getSticky .}}
|
sticky = {{getSticky .}}
|
||||||
{{end}}
|
{{end}}
|
||||||
{{ if hasCircuitBreakerLabels . }}
|
{{ if hasCircuitBreakerLabels . }}
|
||||||
[backends."backend{{getFrontendBackend . }}".circuitbreaker]
|
[backends."backend{{getBackend . }}".circuitbreaker]
|
||||||
expression = "{{getCircuitBreakerExpression . }}"
|
expression = "{{getCircuitBreakerExpression . }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
{{ if hasHealthCheckLabels . }}
|
{{ if hasHealthCheckLabels . }}
|
||||||
[backends."backend{{getFrontendBackend . }}".healthcheck]
|
[backends."backend{{getBackend . }}".healthcheck]
|
||||||
path = "{{getHealthCheckPath . }}"
|
path = "{{getHealthCheckPath . }}"
|
||||||
interval = "{{getHealthCheckInterval . }}"
|
interval = "{{getHealthCheckInterval . }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
[frontends]{{range .Applications}}
|
[frontends]{{range $apps}}
|
||||||
[frontends."frontend{{.ID | replace "/" "-"}}"]
|
[frontends."frontend{{.ID | replace "/" "-"}}"]
|
||||||
backend = "backend{{getFrontendBackend .}}"
|
backend = "backend{{getBackend .}}"
|
||||||
passHostHeader = {{getPassHostHeader .}}
|
passHostHeader = {{getPassHostHeader .}}
|
||||||
priority = {{getPriority .}}
|
priority = {{getPriority .}}
|
||||||
entryPoints = [{{range getEntryPoints .}}
|
entryPoints = [{{range getEntryPoints .}}
|
||||||
|
|
Loading…
Reference in a new issue