173 lines
4.8 KiB
Go
173 lines
4.8 KiB
Go
package mesos
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/cenk/backoff"
|
|
"github.com/containous/traefik/job"
|
|
"github.com/containous/traefik/log"
|
|
"github.com/containous/traefik/provider"
|
|
"github.com/containous/traefik/safe"
|
|
"github.com/containous/traefik/types"
|
|
"github.com/mesos/mesos-go/detector"
|
|
"github.com/mesosphere/mesos-dns/records"
|
|
"github.com/mesosphere/mesos-dns/records/state"
|
|
|
|
// Register mesos zoo the detector
|
|
_ "github.com/mesos/mesos-go/detector/zoo"
|
|
"github.com/mesosphere/mesos-dns/detect"
|
|
"github.com/mesosphere/mesos-dns/logging"
|
|
"github.com/mesosphere/mesos-dns/util"
|
|
)
|
|
|
|
var _ provider.Provider = (*Provider)(nil)
|
|
|
|
// Provider holds configuration of the provider.
|
|
type Provider struct {
|
|
provider.BaseProvider
|
|
Endpoint string `description:"Mesos server endpoint. You can also specify multiple endpoint for Mesos"`
|
|
Domain string `description:"Default domain used"`
|
|
ExposedByDefault bool `description:"Expose Mesos apps by default" export:"true"`
|
|
GroupsAsSubDomains bool `description:"Convert Mesos groups to subdomains" export:"true"`
|
|
ZkDetectionTimeout int `description:"Zookeeper timeout (in seconds)" export:"true"`
|
|
RefreshSeconds int `description:"Polling interval (in seconds)" export:"true"`
|
|
IPSources string `description:"IPSources (e.g. host, docker, mesos, netinfo)" export:"true"`
|
|
StateTimeoutSecond int `description:"HTTP Timeout (in seconds)" export:"true"`
|
|
Masters []string
|
|
}
|
|
|
|
// Init the provider
|
|
func (p *Provider) Init(constraints types.Constraints) error {
|
|
return p.BaseProvider.Init(constraints)
|
|
}
|
|
|
|
// Provide allows the mesos provider to provide configurations to traefik
|
|
// using the given configuration channel.
|
|
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
|
|
operation := func() error {
|
|
|
|
// initialize logging
|
|
logging.SetupLogs()
|
|
|
|
log.Debugf("%s", p.IPSources)
|
|
|
|
var zk string
|
|
var masters []string
|
|
|
|
if strings.HasPrefix(p.Endpoint, "zk://") {
|
|
zk = p.Endpoint
|
|
} else {
|
|
masters = strings.Split(p.Endpoint, ",")
|
|
}
|
|
|
|
errch := make(chan error)
|
|
|
|
changed := detectMasters(zk, masters)
|
|
reload := time.NewTicker(time.Second * time.Duration(p.RefreshSeconds))
|
|
zkTimeout := time.Second * time.Duration(p.ZkDetectionTimeout)
|
|
timeout := time.AfterFunc(zkTimeout, func() {
|
|
if zkTimeout > 0 {
|
|
errch <- fmt.Errorf("master detection timed out after %s", zkTimeout)
|
|
}
|
|
})
|
|
|
|
defer reload.Stop()
|
|
defer util.HandleCrash()
|
|
|
|
if !p.Watch {
|
|
reload.Stop()
|
|
timeout.Stop()
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-reload.C:
|
|
tasks := p.getTasks()
|
|
configuration := p.buildConfiguration(tasks)
|
|
if configuration != nil {
|
|
configurationChan <- types.ConfigMessage{
|
|
ProviderName: "mesos",
|
|
Configuration: configuration,
|
|
}
|
|
}
|
|
case masters := <-changed:
|
|
if len(masters) == 0 || masters[0] == "" {
|
|
// no leader
|
|
timeout.Reset(zkTimeout)
|
|
} else {
|
|
timeout.Stop()
|
|
}
|
|
log.Debugf("new masters detected: %v", masters)
|
|
p.Masters = masters
|
|
tasks := p.getTasks()
|
|
configuration := p.buildConfiguration(tasks)
|
|
if configuration != nil {
|
|
configurationChan <- types.ConfigMessage{
|
|
ProviderName: "mesos",
|
|
Configuration: configuration,
|
|
}
|
|
}
|
|
case err := <-errch:
|
|
log.Errorf("%s", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
notify := func(err error, time time.Duration) {
|
|
log.Errorf("Mesos connection error %+v, retrying in %s", err, time)
|
|
}
|
|
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
|
|
if err != nil {
|
|
log.Errorf("Cannot connect to Mesos server %+v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func detectMasters(zk string, masters []string) <-chan []string {
|
|
changed := make(chan []string, 1)
|
|
if zk != "" {
|
|
log.Debugf("Starting master detector for ZK %s", zk)
|
|
if md, err := detector.New(zk); err != nil {
|
|
log.Errorf("Failed to create master detector: %v", err)
|
|
} else if err := md.Detect(detect.NewMasters(masters, changed)); err != nil {
|
|
log.Errorf("Failed to initialize master detector: %v", err)
|
|
}
|
|
} else {
|
|
changed <- masters
|
|
}
|
|
return changed
|
|
}
|
|
|
|
func (p *Provider) getTasks() []state.Task {
|
|
rg := records.NewRecordGenerator(time.Duration(p.StateTimeoutSecond) * time.Second)
|
|
|
|
st, err := rg.FindMaster(p.Masters...)
|
|
if err != nil {
|
|
log.Errorf("Failed to create a client for Mesos, error: %v", err)
|
|
return nil
|
|
}
|
|
|
|
return taskRecords(st)
|
|
}
|
|
|
|
func taskRecords(st state.State) []state.Task {
|
|
var tasks []state.Task
|
|
for _, f := range st.Frameworks {
|
|
for _, task := range f.Tasks {
|
|
for _, slave := range st.Slaves {
|
|
if task.SlaveID == slave.ID {
|
|
task.SlaveIP = slave.PID.Host
|
|
}
|
|
}
|
|
|
|
// only do running and discoverable tasks
|
|
if task.State == "TASK_RUNNING" {
|
|
tasks = append(tasks, task)
|
|
}
|
|
}
|
|
}
|
|
|
|
return tasks
|
|
}
|