From 9227d32d57ecb53d69bc936a704260a2fd377741 Mon Sep 17 00:00:00 2001 From: Daniel Tomcej Date: Fri, 18 May 2018 06:12:03 -0600 Subject: [PATCH] Enable Ingress Status updates --- docs/configuration/backends/kubernetes.md | 20 +++ provider/kubernetes/builder_service_test.go | 28 +++- provider/kubernetes/client.go | 30 ++++ provider/kubernetes/client_mock_test.go | 13 +- provider/kubernetes/kubernetes.go | 69 ++++++++-- provider/kubernetes/kubernetes_test.go | 143 ++++++++++++++++++++ 6 files changed, 289 insertions(+), 14 deletions(-) diff --git a/docs/configuration/backends/kubernetes.md b/docs/configuration/backends/kubernetes.md index 53e26c230..4d3dff920 100644 --- a/docs/configuration/backends/kubernetes.md +++ b/docs/configuration/backends/kubernetes.md @@ -81,6 +81,20 @@ See also [Kubernetes user guide](/user-guide/kubernetes). # Default: # # filename = "kubernetes.tmpl" + +# Enable IngressEndpoint configuration. +# This will allow Traefik to update the status section of ingress objects, if desired. +# +# Optional +# +# [kubernetes.ingressEndpoint] +# +# At least one must be configured. +# `publishedservice` will override the `hostname` and `ip` settings if configured. +# +# hostname = "localhost" +# ip = "127.0.0.1" +# publishedService = "namespace/servicename" ``` ### `endpoint` @@ -105,6 +119,12 @@ A label selector can be defined to filter on specific Ingress objects only. See [label-selectors](https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors) for details. +### `ingressEndpoint` + +You can configure a static hostname or IP address that Traefik will add to the status section of Ingress objects that it manages. +If you prefer, you can provide a service, which traefik will copy the status spec from. +This will give more flexibility in cloud/dynamic environments. + ### TLS communication between Traefik and backend pods Traefik automatically requests endpoint information based on the service provided in the ingress spec. diff --git a/provider/kubernetes/builder_service_test.go b/provider/kubernetes/builder_service_test.go index 84511080a..1225be328 100644 --- a/provider/kubernetes/builder_service_test.go +++ b/provider/kubernetes/builder_service_test.go @@ -45,12 +45,36 @@ func sAnnotation(name string, value string) func(*corev1.Service) { } func sSpec(opts ...func(*corev1.ServiceSpec)) func(*corev1.Service) { - return func(i *corev1.Service) { + return func(s *corev1.Service) { spec := &corev1.ServiceSpec{} for _, opt := range opts { opt(spec) } - i.Spec = *spec + s.Spec = *spec + } +} + +func sLoadBalancerStatus(opts ...func(*corev1.LoadBalancerStatus)) func(service *corev1.Service) { + return func(s *corev1.Service) { + loadBalancer := &corev1.LoadBalancerStatus{} + for _, opt := range opts { + if opt != nil { + opt(loadBalancer) + } + } + s.Status = corev1.ServiceStatus{ + LoadBalancer: *loadBalancer, + } + } +} + +func sLoadBalancerIngress(ip string, hostname string) func(*corev1.LoadBalancerStatus) { + return func(status *corev1.LoadBalancerStatus) { + ingress := corev1.LoadBalancerIngress{ + IP: ip, + Hostname: hostname, + } + status.Ingress = append(status.Ingress, ingress) } } diff --git a/provider/kubernetes/client.go b/provider/kubernetes/client.go index 885f14b81..706d66e20 100644 --- a/provider/kubernetes/client.go +++ b/provider/kubernetes/client.go @@ -44,6 +44,7 @@ type Client interface { GetService(namespace, name string) (*corev1.Service, bool, error) GetSecret(namespace, name string) (*corev1.Secret, bool, error) GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) + UpdateIngressStatus(namespace, name, ip, hostname string) error } type clientImpl struct { @@ -166,6 +167,35 @@ func (c *clientImpl) GetIngresses() []*extensionsv1beta1.Ingress { return result } +// UpdateIngressStatus updates an Ingress with a provided status. +func (c *clientImpl) UpdateIngressStatus(namespace, name, ip, hostname string) error { + keyName := namespace + "/" + name + + item, exists, err := c.factories[c.lookupNamespace(namespace)].Extensions().V1beta1().Ingresses().Informer().GetStore().GetByKey(keyName) + if err != nil { + return fmt.Errorf("failed to get ingress %s with error: %v", keyName, err) + } + if !exists { + return fmt.Errorf("failed to update ingress %s because it does not exist", keyName) + } + + ing := item.(*extensionsv1beta1.Ingress) + if ing.Status.LoadBalancer.Ingress[0].Hostname == hostname && ing.Status.LoadBalancer.Ingress[0].IP == ip { + // If status is already set, skip update + log.Debugf("Skipping status update on ingress %s/%s", ing.Namespace, ing.Name) + } else { + ingCopy := ing.DeepCopy() + ingCopy.Status = extensionsv1beta1.IngressStatus{LoadBalancer: corev1.LoadBalancerStatus{Ingress: []corev1.LoadBalancerIngress{{IP: ip, Hostname: hostname}}}} + + _, err := c.clientset.ExtensionsV1beta1().Ingresses(ingCopy.Namespace).UpdateStatus(ingCopy) + if err != nil { + return fmt.Errorf("failed to update ingress status %s with error: %v", keyName, err) + } + log.Infof("Updated status on ingress %s", keyName) + } + return nil +} + // GetService returns the named service from the given namespace. func (c *clientImpl) GetService(namespace, name string) (*corev1.Service, bool, error) { var service *corev1.Service diff --git a/provider/kubernetes/client_mock_test.go b/provider/kubernetes/client_mock_test.go index c968e0d56..a855e2388 100644 --- a/provider/kubernetes/client_mock_test.go +++ b/provider/kubernetes/client_mock_test.go @@ -12,9 +12,10 @@ type clientMock struct { endpoints []*corev1.Endpoints watchChan chan interface{} - apiServiceError error - apiSecretError error - apiEndpointsError error + apiServiceError error + apiSecretError error + apiEndpointsError error + apiIngressStatusError error } func (c clientMock) GetIngresses() []*extensionsv1beta1.Ingress { @@ -31,7 +32,7 @@ func (c clientMock) GetService(namespace, name string) (*corev1.Service, bool, e return service, true, nil } } - return nil, false, nil + return nil, false, c.apiServiceError } func (c clientMock) GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) { @@ -64,3 +65,7 @@ func (c clientMock) GetSecret(namespace, name string) (*corev1.Secret, bool, err func (c clientMock) WatchAll(namespaces Namespaces, stopCh <-chan struct{}) (<-chan interface{}, error) { return c.watchChan, nil } + +func (c clientMock) UpdateIngressStatus(namespace, name, ip, hostname string) error { + return c.apiIngressStatusError +} diff --git a/provider/kubernetes/kubernetes.go b/provider/kubernetes/kubernetes.go index 9861a1e27..9896746e2 100644 --- a/provider/kubernetes/kubernetes.go +++ b/provider/kubernetes/kubernetes.go @@ -36,17 +36,25 @@ const ( traefikDefaultIngressClass = "traefik" ) +// IngressEndpoint holds the endpoint information for the Kubernetes provider +type IngressEndpoint struct { + IP string `description:"IP used for Kubernetes Ingress endpoints"` + Hostname string `description:"Hostname used for Kubernetes Ingress endpoints"` + PublishedService string `description:"Published Kubernetes Service to copy status from"` +} + // Provider holds configurations of the provider. type Provider struct { provider.BaseProvider `mapstructure:",squash" export:"true"` - Endpoint string `description:"Kubernetes server endpoint (required for external cluster client)"` - Token string `description:"Kubernetes bearer token (not needed for in-cluster client)"` - CertAuthFilePath string `description:"Kubernetes certificate authority file path (not needed for in-cluster client)"` - DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers" export:"true"` - EnablePassTLSCert bool `description:"Kubernetes enable Pass TLS Client Certs" export:"true"` - Namespaces Namespaces `description:"Kubernetes namespaces" export:"true"` - LabelSelector string `description:"Kubernetes Ingress label selector to use" export:"true"` - IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for" export:"true"` + Endpoint string `description:"Kubernetes server endpoint (required for external cluster client)"` + Token string `description:"Kubernetes bearer token (not needed for in-cluster client)"` + CertAuthFilePath string `description:"Kubernetes certificate authority file path (not needed for in-cluster client)"` + DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers" export:"true"` + EnablePassTLSCert bool `description:"Kubernetes enable Pass TLS Client Certs" export:"true"` + Namespaces Namespaces `description:"Kubernetes namespaces" export:"true"` + LabelSelector string `description:"Kubernetes Ingress label selector to use" export:"true"` + IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for" export:"true"` + IngressEndpoint *IngressEndpoint `description:"Kubernetes Ingress Endpoint"` lastConfiguration safe.Safe } @@ -125,10 +133,12 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s return nil case event := <-eventsChan: log.Debugf("Received Kubernetes event kind %T", event) + templateObjects, err := p.loadIngresses(k8sClient) if err != nil { return err } + if reflect.DeepEqual(p.lastConfiguration.Get(), templateObjects) { log.Debugf("Skipping Kubernetes event kind %T", event) } else { @@ -326,10 +336,53 @@ func (p *Provider) loadIngresses(k8sClient Client) (*types.Configuration, error) } } } + + err = p.updateIngressStatus(i, k8sClient) + if err != nil { + log.Errorf("Cannot update Ingress %s/%s due to error: %v", i.Namespace, i.Name, err) + } } return &templateObjects, nil } +func (p *Provider) updateIngressStatus(i *extensionsv1beta1.Ingress, k8sClient Client) error { + // Only process if an IngressEndpoint has been configured + if p.IngressEndpoint == nil { + return nil + } + + if len(p.IngressEndpoint.PublishedService) == 0 { + if len(p.IngressEndpoint.IP) == 0 && len(p.IngressEndpoint.Hostname) == 0 { + return errors.New("publishedService or ip or hostname must be defined") + } + + return k8sClient.UpdateIngressStatus(i.Namespace, i.Name, p.IngressEndpoint.IP, p.IngressEndpoint.Hostname) + } + + serviceInfo := strings.Split(p.IngressEndpoint.PublishedService, "/") + if len(serviceInfo) != 2 { + return fmt.Errorf("invalid publishedService format (expected 'namespace/service' format): %s", p.IngressEndpoint.PublishedService) + } + serviceNamespace, serviceName := serviceInfo[0], serviceInfo[1] + + service, exists, err := k8sClient.GetService(serviceNamespace, serviceName) + if err != nil { + return fmt.Errorf("cannot get service %s, received error: %s", p.IngressEndpoint.PublishedService, err) + } + + if exists && service.Status.LoadBalancer.Ingress == nil { + // service exists, but has no Load Balancer status + log.Debugf("Skipping updating Ingress %s/%s due to service %s having no status set", i.Namespace, i.Name, p.IngressEndpoint.PublishedService) + return nil + } + + if !exists { + return fmt.Errorf("missing service: %s", p.IngressEndpoint.PublishedService) + } + + return k8sClient.UpdateIngressStatus(i.Namespace, i.Name, service.Status.LoadBalancer.Ingress[0].IP, service.Status.LoadBalancer.Ingress[0].Hostname) +} + func (p *Provider) loadConfig(templateObjects types.Configuration) *types.Configuration { var FuncMap = template.FuncMap{} configuration, err := p.GetConfiguration("templates/kubernetes.tmpl", FuncMap, templateObjects) diff --git a/provider/kubernetes/kubernetes_test.go b/provider/kubernetes/kubernetes_test.go index d9b4f55b3..aecb51ecd 100644 --- a/provider/kubernetes/kubernetes_test.go +++ b/provider/kubernetes/kubernetes_test.go @@ -2020,3 +2020,146 @@ func TestMultiPortServices(t *testing.T) { assert.Equal(t, expected, actual) } + +func TestProviderUpdateIngressStatus(t *testing.T) { + testCases := []struct { + desc string + ingressEndpoint *IngressEndpoint + apiServiceError error + apiIngressStatusError error + expectedError bool + }{ + { + desc: "without IngressEndpoint configuration", + expectedError: false, + }, + { + desc: "without any IngressEndpoint option", + ingressEndpoint: &IngressEndpoint{}, + expectedError: true, + }, + { + desc: "PublishedService - invalid format", + ingressEndpoint: &IngressEndpoint{ + PublishedService: "foo", + }, + expectedError: true, + }, + { + desc: "PublishedService - missing service", + ingressEndpoint: &IngressEndpoint{ + PublishedService: "foo/bar", + }, + expectedError: true, + }, + { + desc: "PublishedService - get service error", + ingressEndpoint: &IngressEndpoint{ + PublishedService: "foo/bar", + }, + apiServiceError: errors.New("error"), + expectedError: true, + }, + { + desc: "PublishedService - Skipping empty LoadBalancerIngress", + ingressEndpoint: &IngressEndpoint{ + PublishedService: "testing/service-empty-status", + }, + expectedError: false, + }, + { + desc: "PublishedService - with update error", + ingressEndpoint: &IngressEndpoint{ + PublishedService: "testing/service", + }, + apiIngressStatusError: errors.New("error"), + expectedError: true, + }, + { + desc: "PublishedService - right service", + ingressEndpoint: &IngressEndpoint{ + PublishedService: "testing/service", + }, + expectedError: false, + }, + { + desc: "IP - valid", + ingressEndpoint: &IngressEndpoint{ + IP: "127.0.0.1", + }, + expectedError: false, + }, + { + desc: "IP - with update error", + ingressEndpoint: &IngressEndpoint{ + IP: "127.0.0.1", + }, + apiIngressStatusError: errors.New("error"), + expectedError: true, + }, + { + desc: "hostname - valid", + ingressEndpoint: &IngressEndpoint{ + Hostname: "foo", + }, + expectedError: false, + }, + { + desc: "hostname - with update error", + ingressEndpoint: &IngressEndpoint{ + Hostname: "foo", + }, + apiIngressStatusError: errors.New("error"), + expectedError: true, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + + p := &Provider{ + IngressEndpoint: test.ingressEndpoint, + } + + services := []*corev1.Service{ + buildService( + sName("service-empty-status"), + sNamespace("testing"), + sLoadBalancerStatus(), + sUID("1"), + sSpec( + clusterIP("10.0.0.1"), + sPorts(sPort(80, ""))), + ), + buildService( + sName("service"), + sNamespace("testing"), + sLoadBalancerStatus(sLoadBalancerIngress("127.0.0.1", "")), + sUID("2"), + sSpec( + clusterIP("10.0.0.2"), + sPorts(sPort(80, ""))), + ), + } + + client := clientMock{ + services: services, + apiServiceError: test.apiServiceError, + apiIngressStatusError: test.apiIngressStatusError, + } + + i := &extensionsv1beta1.Ingress{} + + err := p.updateIngressStatus(i, client) + + if test.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } + +}