package marathon import ( "context" "errors" "fmt" "math" "net" "strconv" "strings" "github.com/containous/traefik/v2/pkg/config/dynamic" "github.com/containous/traefik/v2/pkg/config/label" "github.com/containous/traefik/v2/pkg/log" "github.com/containous/traefik/v2/pkg/provider" "github.com/containous/traefik/v2/pkg/provider/constraints" "github.com/gambol99/go-marathon" ) func (p *Provider) buildConfiguration(ctx context.Context, applications *marathon.Applications) *dynamic.Configuration { configurations := make(map[string]*dynamic.Configuration) for _, app := range applications.Apps { ctxApp := log.With(ctx, log.Str("applicationID", app.ID)) logger := log.FromContext(ctxApp) extraConf, err := p.getConfiguration(app) if err != nil { logger.Errorf("Skip application: %v", err) continue } labels := stringValueMap(app.Labels) if app.Constraints != nil { for i, constraintParts := range *app.Constraints { key := constraints.MarathonConstraintPrefix + "-" + strconv.Itoa(i) labels[key] = strings.Join(constraintParts, ":") } } if !p.keepApplication(ctxApp, extraConf, labels) { continue } confFromLabel, err := label.DecodeConfiguration(labels) if err != nil { logger.Error(err) continue } if len(confFromLabel.TCP.Routers) > 0 || len(confFromLabel.TCP.Services) > 0 { err := p.buildTCPServiceConfiguration(ctxApp, app, extraConf, confFromLabel.TCP) if err != nil { logger.Error(err) continue } provider.BuildTCPRouterConfiguration(ctxApp, confFromLabel.TCP) if len(confFromLabel.HTTP.Routers) == 0 && len(confFromLabel.HTTP.Middlewares) == 0 && len(confFromLabel.HTTP.Services) == 0 { configurations[app.ID] = confFromLabel continue } } err = p.buildServiceConfiguration(ctxApp, app, extraConf, confFromLabel.HTTP) if err != nil { logger.Error(err) continue } model := struct { Name string Labels map[string]string }{ Name: app.ID, Labels: labels, } serviceName := getServiceName(app) provider.BuildRouterConfiguration(ctxApp, confFromLabel.HTTP, serviceName, p.defaultRuleTpl, model) configurations[app.ID] = confFromLabel } return provider.Merge(ctx, configurations) } func getServiceName(app marathon.Application) string { return strings.Replace(strings.TrimPrefix(app.ID, "/"), "/", "_", -1) } func (p *Provider) buildServiceConfiguration(ctx context.Context, app marathon.Application, extraConf configuration, conf *dynamic.HTTPConfiguration) error { appName := getServiceName(app) appCtx := log.With(ctx, log.Str("ApplicationID", appName)) if len(conf.Services) == 0 { conf.Services = make(map[string]*dynamic.Service) lb := &dynamic.ServersLoadBalancer{} lb.SetDefaults() conf.Services[appName] = &dynamic.Service{ LoadBalancer: lb, } } for serviceName, service := range conf.Services { var servers []dynamic.Server defaultServer := dynamic.Server{} defaultServer.SetDefaults() if len(service.LoadBalancer.Servers) > 0 { defaultServer = service.LoadBalancer.Servers[0] } for _, task := range app.Tasks { if p.taskFilter(ctx, *task, app) { server, err := p.getServer(app, *task, extraConf, defaultServer) if err != nil { log.FromContext(appCtx).Errorf("Skip task: %v", err) continue } servers = append(servers, server) } } if len(servers) == 0 { return fmt.Errorf("no server for the service %s", serviceName) } service.LoadBalancer.Servers = servers } return nil } func (p *Provider) buildTCPServiceConfiguration(ctx context.Context, app marathon.Application, extraConf configuration, conf *dynamic.TCPConfiguration) error { appName := getServiceName(app) appCtx := log.With(ctx, log.Str("ApplicationID", appName)) if len(conf.Services) == 0 { conf.Services = make(map[string]*dynamic.TCPService) lb := &dynamic.TCPServersLoadBalancer{} lb.SetDefaults() conf.Services[appName] = &dynamic.TCPService{ LoadBalancer: lb, } } for serviceName, service := range conf.Services { var servers []dynamic.TCPServer defaultServer := dynamic.TCPServer{} if len(service.LoadBalancer.Servers) > 0 { defaultServer = service.LoadBalancer.Servers[0] } for _, task := range app.Tasks { if p.taskFilter(ctx, *task, app) { server, err := p.getTCPServer(app, *task, extraConf, defaultServer) if err != nil { log.FromContext(appCtx).Errorf("Skip task: %v", err) continue } servers = append(servers, server) } } if len(servers) == 0 { return fmt.Errorf("no server for the service %s", serviceName) } service.LoadBalancer.Servers = servers } return nil } func (p *Provider) keepApplication(ctx context.Context, extraConf configuration, labels map[string]string) bool { logger := log.FromContext(ctx) // Filter disabled application. if !extraConf.Enable { logger.Debug("Filtering disabled Marathon application") return false } // Filter by constraints. matches, err := constraints.MatchLabels(labels, p.Constraints) if err != nil { logger.Errorf("Error matching constraints expression: %v", err) return false } if !matches { logger.Debugf("Marathon application filtered by constraint expression: %q", p.Constraints) return false } return true } func (p *Provider) taskFilter(ctx context.Context, task marathon.Task, application marathon.Application) bool { if task.State != string(taskStateRunning) { return false } if ready := p.readyChecker.Do(task, application); !ready { log.FromContext(ctx).Infof("Filtering unready task %s from application %s", task.ID, application.ID) return false } return true } func (p *Provider) getTCPServer(app marathon.Application, task marathon.Task, extraConf configuration, defaultServer dynamic.TCPServer) (dynamic.TCPServer, error) { host, err := p.getServerHost(task, app, extraConf) if len(host) == 0 { return dynamic.TCPServer{}, err } port, err := getPort(task, app, defaultServer.Port) if err != nil { return dynamic.TCPServer{}, err } server := dynamic.TCPServer{ Address: net.JoinHostPort(host, port), } return server, nil } func (p *Provider) getServer(app marathon.Application, task marathon.Task, extraConf configuration, defaultServer dynamic.Server) (dynamic.Server, error) { host, err := p.getServerHost(task, app, extraConf) if len(host) == 0 { return dynamic.Server{}, err } port, err := getPort(task, app, defaultServer.Port) if err != nil { return dynamic.Server{}, err } server := dynamic.Server{ URL: fmt.Sprintf("%s://%s", defaultServer.Scheme, net.JoinHostPort(host, port)), } return server, nil } func (p *Provider) getServerHost(task marathon.Task, app marathon.Application, extraConf configuration) (string, error) { networks := app.Networks var hostFlag bool if networks == nil { hostFlag = app.IPAddressPerTask == nil } else { hostFlag = (*networks)[0].Mode != marathon.ContainerNetworkMode } if hostFlag || p.ForceTaskHostname { if len(task.Host) == 0 { return "", fmt.Errorf("host is undefined for task %q app %q", task.ID, app.ID) } return task.Host, nil } numTaskIPAddresses := len(task.IPAddresses) switch numTaskIPAddresses { case 0: return "", fmt.Errorf("missing IP address for Marathon application %s on task %s", app.ID, task.ID) case 1: return task.IPAddresses[0].IPAddress, nil default: if extraConf.Marathon.IPAddressIdx == math.MinInt32 { return "", fmt.Errorf("found %d task IP addresses but missing IP address index for Marathon application %s on task %s", numTaskIPAddresses, app.ID, task.ID) } if extraConf.Marathon.IPAddressIdx < 0 || extraConf.Marathon.IPAddressIdx > numTaskIPAddresses { return "", fmt.Errorf("cannot use IP address index to select from %d task IP addresses for Marathon application %s on task %s", numTaskIPAddresses, app.ID, task.ID) } return task.IPAddresses[extraConf.Marathon.IPAddressIdx].IPAddress, nil } } func getPort(task marathon.Task, app marathon.Application, serverPort string) (string, error) { port, err := processPorts(app, task, serverPort) if err != nil { return "", fmt.Errorf("unable to process ports for %s %s: %v", app.ID, task.ID, err) } return strconv.Itoa(port), nil } // processPorts returns the configured port. // An explicitly specified port is preferred. If none is specified, it selects // one of the available port. The first such found port is returned unless an // optional index is provided. func processPorts(app marathon.Application, task marathon.Task, serverPort string) (int, error) { if len(serverPort) > 0 && !strings.HasPrefix(serverPort, "index:") { port, err := strconv.Atoi(serverPort) if err != nil { return 0, err } if port <= 0 { return 0, fmt.Errorf("explicitly specified port %d must be greater than zero", port) } else if port > 0 { return port, nil } } ports := retrieveAvailablePorts(app, task) if len(ports) == 0 { return 0, errors.New("no port found") } portIndex := 0 if strings.HasPrefix(serverPort, "index:") { split := strings.SplitN(serverPort, ":", 2) index, err := strconv.Atoi(split[1]) if err != nil { return 0, err } if index < 0 || index > len(ports)-1 { return 0, fmt.Errorf("index %d must be within range (0, %d)", index, len(ports)-1) } portIndex = index } return ports[portIndex], nil } func retrieveAvailablePorts(app marathon.Application, task marathon.Task) []int { // Using default port configuration if len(task.Ports) > 0 { return task.Ports } // Using port definition if available if app.PortDefinitions != nil && len(*app.PortDefinitions) > 0 { var ports []int for _, def := range *app.PortDefinitions { if def.Port != nil { ports = append(ports, *def.Port) } } return ports } // If using IP-per-task using this port definition if app.IPAddressPerTask != nil && app.IPAddressPerTask.Discovery != nil && len(*(app.IPAddressPerTask.Discovery.Ports)) > 0 { var ports []int for _, def := range *(app.IPAddressPerTask.Discovery.Ports) { ports = append(ports, def.Number) } return ports } return []int{} }