Fix watch pods/services/rc/ingresses

Signed-off-by: Emile Vauge <emile@vauge.com>
This commit is contained in:
Emile Vauge 2016-04-25 16:56:06 +02:00
parent cac9927395
commit 53a2787626
No known key found for this signature in database
GPG key ID: D808B4C167352E59
5 changed files with 231 additions and 87 deletions

View file

@ -72,7 +72,7 @@ spec:
apiVersion: extensions/v1beta1 apiVersion: extensions/v1beta1
kind: Ingress kind: Ingress
metadata: metadata:
name: whoamiIngress name: whoami-ingress
spec: spec:
rules: rules:
- host: foo.localhost - host: foo.localhost

View file

@ -24,6 +24,8 @@ spec:
hostPort: 80 hostPort: 80
- containerPort: 443 - containerPort: 443
hostPort: 443 hostPort: 443
- containerPort: 8080
args: args:
- --web
- --kubernetes - --kubernetes
- --logLevel=DEBUG - --logLevel=DEBUG

View file

@ -7,9 +7,11 @@ import (
"fmt" "fmt"
"github.com/containous/traefik/safe" "github.com/containous/traefik/safe"
"github.com/parnurzeal/gorequest" "github.com/parnurzeal/gorequest"
"net"
"net/http" "net/http"
"net/url" "net/url"
"strings" "strings"
"time"
) )
const ( const (
@ -22,8 +24,8 @@ const (
// Client is a client for the Kubernetes master. // Client is a client for the Kubernetes master.
type Client interface { type Client interface {
GetIngresses(predicate func(Ingress) bool) ([]Ingress, error) GetIngresses(predicate func(Ingress) bool) ([]Ingress, error)
WatchIngresses(predicate func(Ingress) bool, stopCh <-chan bool) (chan interface{}, chan error, error) GetServices(predicate func(Service) bool) ([]Service, error)
GetServices(namespace string, predicate func(Service) bool) ([]Service, error) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error)
} }
type clientImpl struct { type clientImpl struct {
@ -52,19 +54,10 @@ func NewClient(baseURL string, caCert []byte, token string) (Client, error) {
// GetIngresses returns all services in the cluster // GetIngresses returns all services in the cluster
func (c *clientImpl) GetIngresses(predicate func(Ingress) bool) ([]Ingress, error) { func (c *clientImpl) GetIngresses(predicate func(Ingress) bool) ([]Ingress, error) {
getURL := c.endpointURL + extentionsEndpoint + defaultIngress getURL := c.endpointURL + extentionsEndpoint + defaultIngress
request := gorequest.New().Get(getURL)
if len(c.token) > 0 { body, err := c.do(c.request(getURL))
request.Header["Authorization"] = "Bearer " + c.token if err != nil {
pool := x509.NewCertPool() return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, err)
pool.AppendCertsFromPEM(c.caCert)
c.tls = &tls.Config{RootCAs: pool}
}
res, body, errs := request.TLSClientConfig(c.tls).EndBytes()
if errs != nil {
return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, errs)
}
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("http error %d GET %q: %q", res.StatusCode, getURL, string(body))
} }
var ingressList IngressList var ingressList IngressList
@ -80,29 +73,186 @@ func (c *clientImpl) GetIngresses(predicate func(Ingress) bool) ([]Ingress, erro
return ingresses, nil return ingresses, nil
} }
// WatchIngresses returns all services in the cluster // WatchIngresses returns all ingresses in the cluster
func (c *clientImpl) WatchIngresses(predicate func(Ingress) bool, stopCh <-chan bool) (chan interface{}, chan error, error) { func (c *clientImpl) WatchIngresses(stopCh <-chan bool) (chan interface{}, chan error, error) {
getURL := c.endpointURL + extentionsEndpoint + defaultIngress
return c.watch(getURL, stopCh)
}
// GetServices returns all services in the cluster
func (c *clientImpl) GetServices(predicate func(Service) bool) ([]Service, error) {
getURL := c.endpointURL + APIEndpoint + "/services"
body, err := c.do(c.request(getURL))
if err != nil {
return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, err)
}
var serviceList ServiceList
if err := json.Unmarshal(body, &serviceList); err != nil {
return nil, fmt.Errorf("failed to decode list of services resources: %v", err)
}
services := serviceList.Items[:0]
for _, service := range serviceList.Items {
if predicate(service) {
services = append(services, service)
}
}
return services, nil
}
// WatchServices returns all services in the cluster
func (c *clientImpl) WatchServices(stopCh <-chan bool) (chan interface{}, chan error, error) {
getURL := c.endpointURL + APIEndpoint + "/services"
return c.watch(getURL, stopCh)
}
// WatchEvents returns events in the cluster
func (c *clientImpl) WatchEvents(stopCh <-chan bool) (chan interface{}, chan error, error) {
getURL := c.endpointURL + APIEndpoint + "/events"
return c.watch(getURL, stopCh)
}
// WatchPods returns pods in the cluster
func (c *clientImpl) WatchPods(stopCh <-chan bool) (chan interface{}, chan error, error) {
getURL := c.endpointURL + APIEndpoint + "/pods"
return c.watch(getURL, stopCh)
}
// WatchReplicationControllers returns ReplicationControllers in the cluster
func (c *clientImpl) WatchReplicationControllers(stopCh <-chan bool) (chan interface{}, chan error, error) {
getURL := c.endpointURL + APIEndpoint + "/replicationcontrollers"
return c.watch(getURL, stopCh)
}
// WatchAll returns events in the cluster
func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) {
watchCh := make(chan interface{}) watchCh := make(chan interface{})
errCh := make(chan error) errCh := make(chan error)
getURL := c.endpointURL + extentionsEndpoint + defaultIngress + "?watch=true" stopIngresses := make(chan bool)
chanIngresses, chanIngressesErr, err := c.WatchIngresses(stopIngresses)
if err != nil {
return watchCh, errCh, fmt.Errorf("failed to create watch %v", err)
}
stopServices := make(chan bool)
chanServices, chanServicesErr, err := c.WatchServices(stopServices)
if err != nil {
return watchCh, errCh, fmt.Errorf("failed to create watch %v", err)
}
stopPods := make(chan bool)
chanPods, chanPodsErr, err := c.WatchPods(stopPods)
if err != nil {
return watchCh, errCh, fmt.Errorf("failed to create watch %v", err)
}
stopReplicationControllers := make(chan bool)
chanReplicationControllers, chanReplicationControllersErr, err := c.WatchReplicationControllers(stopReplicationControllers)
if err != nil {
return watchCh, errCh, fmt.Errorf("failed to create watch %v", err)
}
go func() {
defer close(watchCh)
defer close(errCh)
defer close(stopIngresses)
defer close(stopServices)
defer close(stopPods)
defer close(stopReplicationControllers)
for {
select {
case <-stopCh:
stopIngresses <- true
stopServices <- true
stopPods <- true
stopReplicationControllers <- true
break
case err := <-chanIngressesErr:
errCh <- err
case err := <-chanServicesErr:
errCh <- err
case err := <-chanPodsErr:
errCh <- err
case err := <-chanReplicationControllersErr:
errCh <- err
case event := <-chanIngresses:
watchCh <- event
case event := <-chanServices:
watchCh <- event
case event := <-chanPods:
watchCh <- event
case event := <-chanReplicationControllers:
watchCh <- event
}
}
}()
return watchCh, errCh, nil
}
func (c *clientImpl) do(request *gorequest.SuperAgent) ([]byte, error) {
res, body, errs := request.EndBytes()
if errs != nil {
return nil, fmt.Errorf("failed to create request: GET %q : %v", request.Url, errs)
}
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("http error %d GET %q: %q", res.StatusCode, request.Url, string(body))
}
return body, nil
}
func (c *clientImpl) request(url string) *gorequest.SuperAgent {
// Make request to Kubernetes API // Make request to Kubernetes API
request := gorequest.New().Get(getURL) request := gorequest.New().Get(url)
if len(c.token) > 0 { if len(c.token) > 0 {
request.Set("Authorization", "Bearer "+c.token) request.Header["Authorization"] = "Bearer " + c.token
pool := x509.NewCertPool() pool := x509.NewCertPool()
pool.AppendCertsFromPEM(c.caCert) pool.AppendCertsFromPEM(c.caCert)
c.tls = &tls.Config{RootCAs: pool} c.tls = &tls.Config{RootCAs: pool}
} }
return request.TLSClientConfig(c.tls)
}
// GenericObject generic object
type GenericObject struct {
TypeMeta `json:",inline"`
ListMeta `json:"metadata,omitempty"`
}
func (c *clientImpl) watch(url string, stopCh <-chan bool) (chan interface{}, chan error, error) {
watchCh := make(chan interface{})
errCh := make(chan error)
// get version
body, err := c.do(c.request(url))
if err != nil {
return watchCh, errCh, fmt.Errorf("failed to create request: GET %q : %v", url, err)
}
var generic GenericObject
if err := json.Unmarshal(body, &generic); err != nil {
return watchCh, errCh, fmt.Errorf("failed to create request: GET %q : %v", url, err)
}
resourceVersion := generic.ResourceVersion
url = url + "?watch&resourceVersion=" + resourceVersion
// Make request to Kubernetes API
request := c.request(url)
request.Transport.Dial = func(network, addr string) (net.Conn, error) {
conn, err := net.Dial(network, addr)
if err != nil {
return nil, err
}
// No timeout for long-polling request
conn.SetDeadline(time.Now())
return conn, nil
}
req, err := request.TLSClientConfig(c.tls).MakeRequest() req, err := request.TLSClientConfig(c.tls).MakeRequest()
if err != nil { if err != nil {
return watchCh, errCh, fmt.Errorf("failed to create request: GET %q : %v", getURL, err) return watchCh, errCh, fmt.Errorf("failed to create request: GET %q : %v", url, err)
} }
request.Client.Transport = request.Transport
res, err := request.Client.Do(req) res, err := request.Client.Do(req)
if err != nil { if err != nil {
return watchCh, errCh, fmt.Errorf("failed to make request: GET %q: %v", getURL, err) return watchCh, errCh, fmt.Errorf("failed to make request: GET %q: %v", url, err)
} }
shouldStop := safe.New(false) shouldStop := safe.New(false)
@ -120,49 +270,15 @@ func (c *clientImpl) WatchIngresses(predicate func(Ingress) bool, stopCh <-chan
defer close(watchCh) defer close(watchCh)
defer close(errCh) defer close(errCh)
for { for {
var ingressList interface{} var eventList interface{}
if err := json.NewDecoder(res.Body).Decode(&ingressList); err != nil { if err := json.NewDecoder(res.Body).Decode(&eventList); err != nil {
if !shouldStop.Get().(bool) { if !shouldStop.Get().(bool) {
errCh <- fmt.Errorf("failed to decode list of ingress resources: %v", err) errCh <- fmt.Errorf("failed to decode watch event: %v", err)
} }
return return
} }
watchCh <- eventList
watchCh <- ingressList
} }
}() }()
return watchCh, errCh, nil return watchCh, errCh, nil
} }
// GetServices returns all services in the cluster
func (c *clientImpl) GetServices(namespace string, predicate func(Service) bool) ([]Service, error) {
getURL := c.endpointURL + APIEndpoint + "/namespaces/" + namespace + "/services"
// Make request to Kubernetes API
request := gorequest.New().Get(getURL)
if len(c.token) > 0 {
request.Header["Authorization"] = "Bearer " + c.token
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(c.caCert)
c.tls = &tls.Config{RootCAs: pool}
}
res, body, errs := request.TLSClientConfig(c.tls).EndBytes()
if errs != nil {
return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, errs)
}
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("http error %d GET %q: %q", res.StatusCode, getURL, string(body))
}
var serviceList ServiceList
if err := json.Unmarshal(body, &serviceList); err != nil {
return nil, fmt.Errorf("failed to decode list of services resources: %v", err)
}
services := serviceList.Items[:0]
for _, service := range serviceList.Items {
if predicate(service) {
services = append(services, service)
}
}
return services, nil
}

View file

@ -6,8 +6,10 @@ import (
"github.com/containous/traefik/provider/k8s" "github.com/containous/traefik/provider/k8s"
"github.com/containous/traefik/safe" "github.com/containous/traefik/safe"
"github.com/containous/traefik/types" "github.com/containous/traefik/types"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"strings"
"text/template" "text/template"
"time" "time"
) )
@ -30,13 +32,13 @@ func (provider *Kubernetes) createClient() (k8s.Client, error) {
token = string(tokenBytes) token = string(tokenBytes)
log.Debugf("Kubernetes token: %s", token) log.Debugf("Kubernetes token: %s", token)
} else { } else {
log.Debugf("Kubernetes load token error: %s", err) log.Errorf("Kubernetes load token error: %s", err)
} }
caCert, err := ioutil.ReadFile(serviceAccountCACert) caCert, err := ioutil.ReadFile(serviceAccountCACert)
if err == nil { if err == nil {
log.Debugf("Kubernetes CA cert: %s", serviceAccountCACert) log.Debugf("Kubernetes CA cert: %s", serviceAccountCACert)
} else { } else {
log.Debugf("Kubernetes load token error: %s", err) log.Errorf("Kubernetes load token error: %s", err)
} }
kubernetesHost := os.Getenv("KUBERNETES_SERVICE_HOST") kubernetesHost := os.Getenv("KUBERNETES_SERVICE_HOST")
kubernetesPort := os.Getenv("KUBERNETES_SERVICE_PORT_HTTPS") kubernetesPort := os.Getenv("KUBERNETES_SERVICE_PORT_HTTPS")
@ -54,38 +56,45 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
if err != nil { if err != nil {
return err return err
} }
backOff := backoff.NewExponentialBackOff()
pool.Go(func(stop chan bool) { pool.Go(func(stop chan bool) {
stopWatch := make(chan bool) stopWatch := make(chan bool)
defer close(stopWatch)
operation := func() error { operation := func() error {
select { select {
case <-stop: case <-stop:
return nil return nil
default: default:
} }
ingressesChan, errChan, err := k8sClient.WatchIngresses(func(ingress k8s.Ingress) bool {
return true
}, stopWatch)
if err != nil {
log.Errorf("Error retrieving ingresses: %v", err)
return err
}
for { for {
select { eventsChan, errEventsChan, err := k8sClient.WatchAll(stopWatch)
case <-stop: if err != nil {
stopWatch <- true log.Errorf("Error watching kubernetes events: %v", err)
return nil
case err := <-errChan:
return err return err
case event := <-ingressesChan: }
log.Debugf("Received event from kubenetes %+v", event) Watch:
templateObjects, err := provider.loadIngresses(k8sClient) for {
if err != nil { select {
case <-stop:
stopWatch <- true
return nil
case err := <-errEventsChan:
if strings.Contains(err.Error(), io.EOF.Error()) {
// edge case, kubernetes long-polling disconnection
break Watch
}
return err return err
} case event := <-eventsChan:
configurationChan <- types.ConfigMessage{ log.Debugf("Received event from kubenetes %+v", event)
ProviderName: "kubernetes", templateObjects, err := provider.loadIngresses(k8sClient)
Configuration: provider.loadConfig(*templateObjects), if err != nil {
return err
}
configurationChan <- types.ConfigMessage{
ProviderName: "kubernetes",
Configuration: provider.loadConfig(*templateObjects),
}
} }
} }
} }
@ -94,12 +103,21 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
notify := func(err error, time time.Duration) { notify := func(err error, time time.Duration) {
log.Errorf("Kubernetes connection error %+v, retrying in %s", err, time) log.Errorf("Kubernetes connection error %+v, retrying in %s", err, time)
} }
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify) err := backoff.RetryNotify(operation, backOff, notify)
if err != nil { if err != nil {
log.Fatalf("Cannot connect to Kubernetes server %+v", err) log.Fatalf("Cannot connect to Kubernetes server %+v", err)
} }
}) })
templateObjects, err := provider.loadIngresses(k8sClient)
if err != nil {
return err
}
configurationChan <- types.ConfigMessage{
ProviderName: "kubernetes",
Configuration: provider.loadConfig(*templateObjects),
}
return nil return nil
} }
@ -139,13 +157,18 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur
Rule: "PathPrefixStrip:" + pa.Path, Rule: "PathPrefixStrip:" + pa.Path,
} }
} }
services, err := k8sClient.GetServices(i.Namespace, func(service k8s.Service) bool { services, err := k8sClient.GetServices(func(service k8s.Service) bool {
return service.Name == pa.Backend.ServiceName return service.Name == pa.Backend.ServiceName
}) })
if err != nil { if err != nil {
log.Errorf("Error retrieving services: %v", err) log.Errorf("Error retrieving services: %v", err)
continue continue
} }
if len(services) == 0 {
// no backends found, delete frontend...
delete(templateObjects.Frontends, r.Host+pa.Path)
log.Errorf("Error retrieving services %s", pa.Backend.ServiceName)
}
for _, service := range services { for _, service := range services {
var protocol string var protocol string
for _, port := range service.Spec.Ports { for _, port := range service.Spec.Ports {

View file

@ -179,6 +179,9 @@ func (c clientMock) GetIngresses(predicate func(k8s.Ingress) bool) ([]k8s.Ingres
func (c clientMock) WatchIngresses(predicate func(k8s.Ingress) bool, stopCh <-chan bool) (chan interface{}, chan error, error) { func (c clientMock) WatchIngresses(predicate func(k8s.Ingress) bool, stopCh <-chan bool) (chan interface{}, chan error, error) {
return c.watchChan, make(chan error), nil return c.watchChan, make(chan error), nil
} }
func (c clientMock) GetServices(namespace string, predicate func(k8s.Service) bool) ([]k8s.Service, error) { func (c clientMock) GetServices(predicate func(k8s.Service) bool) ([]k8s.Service, error) {
return c.services, nil return c.services, nil
} }
func (c clientMock) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) {
return c.watchChan, make(chan error), nil
}