From 53a27876267e02cd613fc48598ea8aa35a921567 Mon Sep 17 00:00:00 2001 From: Emile Vauge Date: Mon, 25 Apr 2016 16:56:06 +0200 Subject: [PATCH] Fix watch pods/services/rc/ingresses Signed-off-by: Emile Vauge --- examples/k8s.ingress.yaml | 2 +- examples/k8s.rc.yaml | 2 + provider/k8s/client.go | 238 +++++++++++++++++++++++++++--------- provider/kubernetes.go | 71 +++++++---- provider/kubernetes_test.go | 5 +- 5 files changed, 231 insertions(+), 87 deletions(-) diff --git a/examples/k8s.ingress.yaml b/examples/k8s.ingress.yaml index 0e460f48f..5b6c4c0d0 100644 --- a/examples/k8s.ingress.yaml +++ b/examples/k8s.ingress.yaml @@ -72,7 +72,7 @@ spec: apiVersion: extensions/v1beta1 kind: Ingress metadata: - name: whoamiIngress + name: whoami-ingress spec: rules: - host: foo.localhost diff --git a/examples/k8s.rc.yaml b/examples/k8s.rc.yaml index 9e1c85241..d7232b37d 100644 --- a/examples/k8s.rc.yaml +++ b/examples/k8s.rc.yaml @@ -24,6 +24,8 @@ spec: hostPort: 80 - containerPort: 443 hostPort: 443 + - containerPort: 8080 args: + - --web - --kubernetes - --logLevel=DEBUG \ No newline at end of file diff --git a/provider/k8s/client.go b/provider/k8s/client.go index 770b9f788..de26dd383 100644 --- a/provider/k8s/client.go +++ b/provider/k8s/client.go @@ -7,9 +7,11 @@ import ( "fmt" "github.com/containous/traefik/safe" "github.com/parnurzeal/gorequest" + "net" "net/http" "net/url" "strings" + "time" ) const ( @@ -22,8 +24,8 @@ const ( // Client is a client for the Kubernetes master. type Client interface { GetIngresses(predicate func(Ingress) bool) ([]Ingress, error) - WatchIngresses(predicate func(Ingress) bool, stopCh <-chan bool) (chan interface{}, chan error, error) - GetServices(namespace string, predicate func(Service) bool) ([]Service, error) + GetServices(predicate func(Service) bool) ([]Service, error) + WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) } type clientImpl struct { @@ -52,19 +54,10 @@ func NewClient(baseURL string, caCert []byte, token string) (Client, error) { // GetIngresses returns all services in the cluster func (c *clientImpl) GetIngresses(predicate func(Ingress) bool) ([]Ingress, error) { getURL := c.endpointURL + extentionsEndpoint + defaultIngress - request := gorequest.New().Get(getURL) - if len(c.token) > 0 { - request.Header["Authorization"] = "Bearer " + c.token - pool := x509.NewCertPool() - pool.AppendCertsFromPEM(c.caCert) - c.tls = &tls.Config{RootCAs: pool} - } - res, body, errs := request.TLSClientConfig(c.tls).EndBytes() - if errs != nil { - return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, errs) - } - if res.StatusCode != http.StatusOK { - return nil, fmt.Errorf("http error %d GET %q: %q", res.StatusCode, getURL, string(body)) + + body, err := c.do(c.request(getURL)) + if err != nil { + return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, err) } var ingressList IngressList @@ -80,29 +73,186 @@ func (c *clientImpl) GetIngresses(predicate func(Ingress) bool) ([]Ingress, erro return ingresses, nil } -// WatchIngresses returns all services in the cluster -func (c *clientImpl) WatchIngresses(predicate func(Ingress) bool, stopCh <-chan bool) (chan interface{}, chan error, error) { +// WatchIngresses returns all ingresses in the cluster +func (c *clientImpl) WatchIngresses(stopCh <-chan bool) (chan interface{}, chan error, error) { + getURL := c.endpointURL + extentionsEndpoint + defaultIngress + return c.watch(getURL, stopCh) +} + +// GetServices returns all services in the cluster +func (c *clientImpl) GetServices(predicate func(Service) bool) ([]Service, error) { + getURL := c.endpointURL + APIEndpoint + "/services" + + body, err := c.do(c.request(getURL)) + if err != nil { + return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, err) + } + + var serviceList ServiceList + if err := json.Unmarshal(body, &serviceList); err != nil { + return nil, fmt.Errorf("failed to decode list of services resources: %v", err) + } + services := serviceList.Items[:0] + for _, service := range serviceList.Items { + if predicate(service) { + services = append(services, service) + } + } + return services, nil +} + +// WatchServices returns all services in the cluster +func (c *clientImpl) WatchServices(stopCh <-chan bool) (chan interface{}, chan error, error) { + getURL := c.endpointURL + APIEndpoint + "/services" + return c.watch(getURL, stopCh) +} + +// WatchEvents returns events in the cluster +func (c *clientImpl) WatchEvents(stopCh <-chan bool) (chan interface{}, chan error, error) { + getURL := c.endpointURL + APIEndpoint + "/events" + return c.watch(getURL, stopCh) +} + +// WatchPods returns pods in the cluster +func (c *clientImpl) WatchPods(stopCh <-chan bool) (chan interface{}, chan error, error) { + getURL := c.endpointURL + APIEndpoint + "/pods" + return c.watch(getURL, stopCh) +} + +// WatchReplicationControllers returns ReplicationControllers in the cluster +func (c *clientImpl) WatchReplicationControllers(stopCh <-chan bool) (chan interface{}, chan error, error) { + getURL := c.endpointURL + APIEndpoint + "/replicationcontrollers" + return c.watch(getURL, stopCh) +} + +// WatchAll returns events in the cluster +func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) { watchCh := make(chan interface{}) errCh := make(chan error) - getURL := c.endpointURL + extentionsEndpoint + defaultIngress + "?watch=true" + stopIngresses := make(chan bool) + chanIngresses, chanIngressesErr, err := c.WatchIngresses(stopIngresses) + if err != nil { + return watchCh, errCh, fmt.Errorf("failed to create watch %v", err) + } + stopServices := make(chan bool) + chanServices, chanServicesErr, err := c.WatchServices(stopServices) + if err != nil { + return watchCh, errCh, fmt.Errorf("failed to create watch %v", err) + } + stopPods := make(chan bool) + chanPods, chanPodsErr, err := c.WatchPods(stopPods) + if err != nil { + return watchCh, errCh, fmt.Errorf("failed to create watch %v", err) + } + stopReplicationControllers := make(chan bool) + chanReplicationControllers, chanReplicationControllersErr, err := c.WatchReplicationControllers(stopReplicationControllers) + if err != nil { + return watchCh, errCh, fmt.Errorf("failed to create watch %v", err) + } + go func() { + defer close(watchCh) + defer close(errCh) + defer close(stopIngresses) + defer close(stopServices) + defer close(stopPods) + defer close(stopReplicationControllers) + for { + select { + case <-stopCh: + stopIngresses <- true + stopServices <- true + stopPods <- true + stopReplicationControllers <- true + break + case err := <-chanIngressesErr: + errCh <- err + case err := <-chanServicesErr: + errCh <- err + case err := <-chanPodsErr: + errCh <- err + case err := <-chanReplicationControllersErr: + errCh <- err + case event := <-chanIngresses: + watchCh <- event + case event := <-chanServices: + watchCh <- event + case event := <-chanPods: + watchCh <- event + case event := <-chanReplicationControllers: + watchCh <- event + } + } + }() + + return watchCh, errCh, nil +} + +func (c *clientImpl) do(request *gorequest.SuperAgent) ([]byte, error) { + res, body, errs := request.EndBytes() + if errs != nil { + return nil, fmt.Errorf("failed to create request: GET %q : %v", request.Url, errs) + } + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("http error %d GET %q: %q", res.StatusCode, request.Url, string(body)) + } + return body, nil +} + +func (c *clientImpl) request(url string) *gorequest.SuperAgent { // Make request to Kubernetes API - request := gorequest.New().Get(getURL) + request := gorequest.New().Get(url) if len(c.token) > 0 { - request.Set("Authorization", "Bearer "+c.token) + request.Header["Authorization"] = "Bearer " + c.token pool := x509.NewCertPool() pool.AppendCertsFromPEM(c.caCert) c.tls = &tls.Config{RootCAs: pool} } + return request.TLSClientConfig(c.tls) +} + +// GenericObject generic object +type GenericObject struct { + TypeMeta `json:",inline"` + ListMeta `json:"metadata,omitempty"` +} + +func (c *clientImpl) watch(url string, stopCh <-chan bool) (chan interface{}, chan error, error) { + watchCh := make(chan interface{}) + errCh := make(chan error) + + // get version + body, err := c.do(c.request(url)) + if err != nil { + return watchCh, errCh, fmt.Errorf("failed to create request: GET %q : %v", url, err) + } + + var generic GenericObject + if err := json.Unmarshal(body, &generic); err != nil { + return watchCh, errCh, fmt.Errorf("failed to create request: GET %q : %v", url, err) + } + resourceVersion := generic.ResourceVersion + + url = url + "?watch&resourceVersion=" + resourceVersion + // Make request to Kubernetes API + request := c.request(url) + request.Transport.Dial = func(network, addr string) (net.Conn, error) { + conn, err := net.Dial(network, addr) + if err != nil { + return nil, err + } + // No timeout for long-polling request + conn.SetDeadline(time.Now()) + return conn, nil + } req, err := request.TLSClientConfig(c.tls).MakeRequest() if err != nil { - return watchCh, errCh, fmt.Errorf("failed to create request: GET %q : %v", getURL, err) + return watchCh, errCh, fmt.Errorf("failed to create request: GET %q : %v", url, err) } - request.Client.Transport = request.Transport res, err := request.Client.Do(req) if err != nil { - return watchCh, errCh, fmt.Errorf("failed to make request: GET %q: %v", getURL, err) + return watchCh, errCh, fmt.Errorf("failed to make request: GET %q: %v", url, err) } shouldStop := safe.New(false) @@ -120,49 +270,15 @@ func (c *clientImpl) WatchIngresses(predicate func(Ingress) bool, stopCh <-chan defer close(watchCh) defer close(errCh) for { - var ingressList interface{} - if err := json.NewDecoder(res.Body).Decode(&ingressList); err != nil { + var eventList interface{} + if err := json.NewDecoder(res.Body).Decode(&eventList); err != nil { if !shouldStop.Get().(bool) { - errCh <- fmt.Errorf("failed to decode list of ingress resources: %v", err) + errCh <- fmt.Errorf("failed to decode watch event: %v", err) } return } - - watchCh <- ingressList + watchCh <- eventList } }() return watchCh, errCh, nil } - -// GetServices returns all services in the cluster -func (c *clientImpl) GetServices(namespace string, predicate func(Service) bool) ([]Service, error) { - getURL := c.endpointURL + APIEndpoint + "/namespaces/" + namespace + "/services" - - // Make request to Kubernetes API - request := gorequest.New().Get(getURL) - if len(c.token) > 0 { - request.Header["Authorization"] = "Bearer " + c.token - pool := x509.NewCertPool() - pool.AppendCertsFromPEM(c.caCert) - c.tls = &tls.Config{RootCAs: pool} - } - res, body, errs := request.TLSClientConfig(c.tls).EndBytes() - if errs != nil { - return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, errs) - } - if res.StatusCode != http.StatusOK { - return nil, fmt.Errorf("http error %d GET %q: %q", res.StatusCode, getURL, string(body)) - } - - var serviceList ServiceList - if err := json.Unmarshal(body, &serviceList); err != nil { - return nil, fmt.Errorf("failed to decode list of services resources: %v", err) - } - services := serviceList.Items[:0] - for _, service := range serviceList.Items { - if predicate(service) { - services = append(services, service) - } - } - return services, nil -} diff --git a/provider/kubernetes.go b/provider/kubernetes.go index 53cc07a16..3fc484901 100644 --- a/provider/kubernetes.go +++ b/provider/kubernetes.go @@ -6,8 +6,10 @@ import ( "github.com/containous/traefik/provider/k8s" "github.com/containous/traefik/safe" "github.com/containous/traefik/types" + "io" "io/ioutil" "os" + "strings" "text/template" "time" ) @@ -30,13 +32,13 @@ func (provider *Kubernetes) createClient() (k8s.Client, error) { token = string(tokenBytes) log.Debugf("Kubernetes token: %s", token) } else { - log.Debugf("Kubernetes load token error: %s", err) + log.Errorf("Kubernetes load token error: %s", err) } caCert, err := ioutil.ReadFile(serviceAccountCACert) if err == nil { log.Debugf("Kubernetes CA cert: %s", serviceAccountCACert) } else { - log.Debugf("Kubernetes load token error: %s", err) + log.Errorf("Kubernetes load token error: %s", err) } kubernetesHost := os.Getenv("KUBERNETES_SERVICE_HOST") kubernetesPort := os.Getenv("KUBERNETES_SERVICE_PORT_HTTPS") @@ -54,38 +56,45 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage if err != nil { return err } + backOff := backoff.NewExponentialBackOff() pool.Go(func(stop chan bool) { stopWatch := make(chan bool) + defer close(stopWatch) operation := func() error { select { case <-stop: return nil default: } - ingressesChan, errChan, err := k8sClient.WatchIngresses(func(ingress k8s.Ingress) bool { - return true - }, stopWatch) - if err != nil { - log.Errorf("Error retrieving ingresses: %v", err) - return err - } for { - select { - case <-stop: - stopWatch <- true - return nil - case err := <-errChan: + eventsChan, errEventsChan, err := k8sClient.WatchAll(stopWatch) + if err != nil { + log.Errorf("Error watching kubernetes events: %v", err) return err - case event := <-ingressesChan: - log.Debugf("Received event from kubenetes %+v", event) - templateObjects, err := provider.loadIngresses(k8sClient) - if err != nil { + } + Watch: + for { + select { + case <-stop: + stopWatch <- true + return nil + case err := <-errEventsChan: + if strings.Contains(err.Error(), io.EOF.Error()) { + // edge case, kubernetes long-polling disconnection + break Watch + } return err - } - configurationChan <- types.ConfigMessage{ - ProviderName: "kubernetes", - Configuration: provider.loadConfig(*templateObjects), + case event := <-eventsChan: + log.Debugf("Received event from kubenetes %+v", event) + templateObjects, err := provider.loadIngresses(k8sClient) + if err != nil { + return err + } + configurationChan <- types.ConfigMessage{ + ProviderName: "kubernetes", + Configuration: provider.loadConfig(*templateObjects), + } } } } @@ -94,12 +103,21 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage notify := func(err error, time time.Duration) { log.Errorf("Kubernetes connection error %+v, retrying in %s", err, time) } - err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify) + err := backoff.RetryNotify(operation, backOff, notify) if err != nil { log.Fatalf("Cannot connect to Kubernetes server %+v", err) } }) + templateObjects, err := provider.loadIngresses(k8sClient) + if err != nil { + return err + } + configurationChan <- types.ConfigMessage{ + ProviderName: "kubernetes", + Configuration: provider.loadConfig(*templateObjects), + } + return nil } @@ -139,13 +157,18 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur Rule: "PathPrefixStrip:" + pa.Path, } } - services, err := k8sClient.GetServices(i.Namespace, func(service k8s.Service) bool { + services, err := k8sClient.GetServices(func(service k8s.Service) bool { return service.Name == pa.Backend.ServiceName }) if err != nil { log.Errorf("Error retrieving services: %v", err) continue } + if len(services) == 0 { + // no backends found, delete frontend... + delete(templateObjects.Frontends, r.Host+pa.Path) + log.Errorf("Error retrieving services %s", pa.Backend.ServiceName) + } for _, service := range services { var protocol string for _, port := range service.Spec.Ports { diff --git a/provider/kubernetes_test.go b/provider/kubernetes_test.go index 1b3dd1bd6..04d9b00b5 100644 --- a/provider/kubernetes_test.go +++ b/provider/kubernetes_test.go @@ -179,6 +179,9 @@ func (c clientMock) GetIngresses(predicate func(k8s.Ingress) bool) ([]k8s.Ingres func (c clientMock) WatchIngresses(predicate func(k8s.Ingress) bool, stopCh <-chan bool) (chan interface{}, chan error, error) { return c.watchChan, make(chan error), nil } -func (c clientMock) GetServices(namespace string, predicate func(k8s.Service) bool) ([]k8s.Service, error) { +func (c clientMock) GetServices(predicate func(k8s.Service) bool) ([]k8s.Service, error) { return c.services, nil } +func (c clientMock) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) { + return c.watchChan, make(chan error), nil +}