traefik/provider/kubernetes.go
Emile Vauge 53a2787626
Fix watch pods/services/rc/ingresses
Signed-off-by: Emile Vauge <emile@vauge.com>
2016-04-25 17:13:26 +02:00

198 lines
5.7 KiB
Go

package provider
import (
log "github.com/Sirupsen/logrus"
"github.com/cenkalti/backoff"
"github.com/containous/traefik/provider/k8s"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"io"
"io/ioutil"
"os"
"strings"
"text/template"
"time"
)
const (
serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token"
serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
)
// Kubernetes holds configurations of the Kubernetes provider.
type Kubernetes struct {
BaseProvider `mapstructure:",squash"`
Endpoint string
}
func (provider *Kubernetes) createClient() (k8s.Client, error) {
var token string
tokenBytes, err := ioutil.ReadFile(serviceAccountToken)
if err == nil {
token = string(tokenBytes)
log.Debugf("Kubernetes token: %s", token)
} else {
log.Errorf("Kubernetes load token error: %s", err)
}
caCert, err := ioutil.ReadFile(serviceAccountCACert)
if err == nil {
log.Debugf("Kubernetes CA cert: %s", serviceAccountCACert)
} else {
log.Errorf("Kubernetes load token error: %s", err)
}
kubernetesHost := os.Getenv("KUBERNETES_SERVICE_HOST")
kubernetesPort := os.Getenv("KUBERNETES_SERVICE_PORT_HTTPS")
if len(kubernetesPort) > 0 && len(kubernetesHost) > 0 {
provider.Endpoint = "https://" + kubernetesHost + ":" + kubernetesPort
}
log.Debugf("Kubernetes endpoint: %s", provider.Endpoint)
return k8s.NewClient(provider.Endpoint, caCert, token)
}
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
k8sClient, err := provider.createClient()
if err != nil {
return err
}
backOff := backoff.NewExponentialBackOff()
pool.Go(func(stop chan bool) {
stopWatch := make(chan bool)
defer close(stopWatch)
operation := func() error {
select {
case <-stop:
return nil
default:
}
for {
eventsChan, errEventsChan, err := k8sClient.WatchAll(stopWatch)
if err != nil {
log.Errorf("Error watching kubernetes events: %v", err)
return err
}
Watch:
for {
select {
case <-stop:
stopWatch <- true
return nil
case err := <-errEventsChan:
if strings.Contains(err.Error(), io.EOF.Error()) {
// edge case, kubernetes long-polling disconnection
break Watch
}
return err
case event := <-eventsChan:
log.Debugf("Received event from kubenetes %+v", event)
templateObjects, err := provider.loadIngresses(k8sClient)
if err != nil {
return err
}
configurationChan <- types.ConfigMessage{
ProviderName: "kubernetes",
Configuration: provider.loadConfig(*templateObjects),
}
}
}
}
}
notify := func(err error, time time.Duration) {
log.Errorf("Kubernetes connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, backOff, notify)
if err != nil {
log.Fatalf("Cannot connect to Kubernetes server %+v", err)
}
})
templateObjects, err := provider.loadIngresses(k8sClient)
if err != nil {
return err
}
configurationChan <- types.ConfigMessage{
ProviderName: "kubernetes",
Configuration: provider.loadConfig(*templateObjects),
}
return nil
}
func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configuration, error) {
ingresses, err := k8sClient.GetIngresses(func(ingress k8s.Ingress) bool {
return true
})
if err != nil {
log.Errorf("Error retrieving ingresses: %+v", err)
return nil, err
}
templateObjects := types.Configuration{
map[string]*types.Backend{},
map[string]*types.Frontend{},
}
for _, i := range ingresses {
for _, r := range i.Spec.Rules {
for _, pa := range r.HTTP.Paths {
if _, exists := templateObjects.Backends[r.Host+pa.Path]; !exists {
templateObjects.Backends[r.Host+pa.Path] = &types.Backend{
Servers: make(map[string]types.Server),
}
}
if _, exists := templateObjects.Frontends[r.Host+pa.Path]; !exists {
templateObjects.Frontends[r.Host+pa.Path] = &types.Frontend{
Backend: r.Host + pa.Path,
Routes: make(map[string]types.Route),
}
}
if _, exists := templateObjects.Frontends[r.Host+pa.Path].Routes[r.Host]; !exists {
templateObjects.Frontends[r.Host+pa.Path].Routes[r.Host] = types.Route{
Rule: "Host:" + r.Host,
}
}
if len(pa.Path) > 0 {
templateObjects.Frontends[r.Host+pa.Path].Routes[pa.Path] = types.Route{
Rule: "PathPrefixStrip:" + pa.Path,
}
}
services, err := k8sClient.GetServices(func(service k8s.Service) bool {
return service.Name == pa.Backend.ServiceName
})
if err != nil {
log.Errorf("Error retrieving services: %v", err)
continue
}
if len(services) == 0 {
// no backends found, delete frontend...
delete(templateObjects.Frontends, r.Host+pa.Path)
log.Errorf("Error retrieving services %s", pa.Backend.ServiceName)
}
for _, service := range services {
var protocol string
for _, port := range service.Spec.Ports {
if port.Port == pa.Backend.ServicePort.IntValue() {
protocol = port.Name
templateObjects.Backends[r.Host+pa.Path].Servers[string(service.UID)] = types.Server{
URL: protocol + "://" + service.Spec.ClusterIP + ":" + pa.Backend.ServicePort.String(),
Weight: 1,
}
break
}
}
}
}
}
}
return &templateObjects, nil
}
func (provider *Kubernetes) loadConfig(templateObjects types.Configuration) *types.Configuration {
var FuncMap = template.FuncMap{}
configuration, err := provider.getConfiguration("templates/kubernetes.tmpl", FuncMap, templateObjects)
if err != nil {
log.Error(err)
}
return configuration
}