From b02393915e7179f9a0be7d81a44d7a99ea39b741 Mon Sep 17 00:00:00 2001 From: Regner Blok-Andersen Date: Fri, 17 Mar 2017 08:34:34 -0700 Subject: [PATCH] Abort Kubernetes Ingress update if Kubernetes API call fails (#1295) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Abort Kubernetes Ingress update if Kubernetes API call fails Currently if a Kubernetes API call fails we potentially remove a working service from Traefik. This changes it so if a Kubernetes API call fails we abort out of the ingress update and use the current working config. Github issue: #1240 Also added a test to cover when requested resources (services and endpoints) that the user has specified don’t exist. * Specifically capturing the tc range as documented here: https://blog.golang.org/subtests * Updating service names in the mock data to be more clear * Updated expected data to match what currently happens in the loadIngress * Adding a blank Servers to the expected output so we compare against that instead of nil. * Replacing the JSON test output with spew for the TestMissingResources test to help ensure we have useful output incase of failures * Adding a temporary fix to the GetEndoints mocked function so we can override the return value for if the endpoints exist. After the 1.2 release the use of properExists should be removed and the GetEndpoints function should return false for the second value indicating the endpoint doesn’t exist. However at this time that would break a lot of the tests. * Adding quick TODO line about removing the properExists property * Link to issue 1307 re: properExists flag. --- provider/kubernetes.go | 25 ++- provider/kubernetes_test.go | 376 +++++++++++++++++++++++++++++++----- 2 files changed, 348 insertions(+), 53 deletions(-) diff --git a/provider/kubernetes.go b/provider/kubernetes.go index 2b7c24195..d5b652818 100644 --- a/provider/kubernetes.go +++ b/provider/kubernetes.go @@ -198,8 +198,13 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur } service, exists, err := k8sClient.GetService(i.ObjectMeta.Namespace, pa.Backend.ServiceName) - if err != nil || !exists { - log.Warnf("Error retrieving service %s/%s: %v", i.ObjectMeta.Namespace, pa.Backend.ServiceName, err) + if err != nil { + log.Errorf("Error while retrieving service information from k8s API %s/%s: %v", service.ObjectMeta.Namespace, pa.Backend.ServiceName, err) + return nil, err + } + + if !exists { + log.Errorf("Service not found for %s/%s", service.ObjectMeta.Namespace, pa.Backend.ServiceName) delete(templateObjects.Frontends, r.Host+pa.Path) continue } @@ -232,18 +237,18 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur } } else { endpoints, exists, err := k8sClient.GetEndpoints(service.ObjectMeta.Namespace, service.ObjectMeta.Name) - if err != nil { - log.Errorf("Error while retrieving endpoints from k8s API %s/%s: %v", service.ObjectMeta.Namespace, service.ObjectMeta.Name, err) + if err != nil { + log.Errorf("Error retrieving endpoints %s/%s: %v", service.ObjectMeta.Namespace, service.ObjectMeta.Name, err) + return nil, err + } + + if !exists { + log.Errorf("Endpoints not found for %s/%s", service.ObjectMeta.Namespace, service.ObjectMeta.Name) continue } - if !exists { - log.Errorf("Service not found for %s/%s", service.ObjectMeta.Namespace, service.ObjectMeta.Name) - continue - } - if len(endpoints.Subsets) == 0 { - log.Warnf("Endpoints not found for %s/%s, falling back to Service ClusterIP", service.ObjectMeta.Namespace, service.ObjectMeta.Name) + log.Warnf("Service endpoints not found for %s/%s, falling back to Service ClusterIP", service.ObjectMeta.Namespace, service.ObjectMeta.Name) templateObjects.Backends[r.Host+pa.Path].Servers[string(service.UID)] = types.Server{ URL: protocol + "://" + service.Spec.ClusterIP + ":" + strconv.Itoa(int(port.Port)), Weight: 1, diff --git a/provider/kubernetes_test.go b/provider/kubernetes_test.go index 086322a0a..1f4d9578e 100644 --- a/provider/kubernetes_test.go +++ b/provider/kubernetes_test.go @@ -2,6 +2,7 @@ package provider import ( "encoding/json" + "errors" "fmt" "reflect" "strings" @@ -9,6 +10,7 @@ import ( "github.com/containous/traefik/provider/k8s" "github.com/containous/traefik/types" + "github.com/davecgh/go-spew/spew" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/apis/extensions/v1beta1" "k8s.io/client-go/pkg/util/intstr" @@ -339,8 +341,8 @@ func TestRuleType(t *testing.T) { desc: "implicit default", ingressRuleType: "", frontendRuleType: ruleTypePathPrefix, - }, - { + }, + { desc: "unknown ingress / explicit default", ingressRuleType: "unknown", frontendRuleType: ruleTypePathPrefix, @@ -349,7 +351,7 @@ func TestRuleType(t *testing.T) { desc: "explicit ingress", ingressRuleType: ruleTypePath, frontendRuleType: ruleTypePath, - }, + }, } for _, test := range tests { @@ -357,27 +359,27 @@ func TestRuleType(t *testing.T) { t.Run(test.desc, func(t *testing.T) { t.Parallel() ingress := &v1beta1.Ingress{ - Spec: v1beta1.IngressSpec{ - Rules: []v1beta1.IngressRule{ - { + Spec: v1beta1.IngressSpec{ + Rules: []v1beta1.IngressRule{ + { Host: "host", - IngressRuleValue: v1beta1.IngressRuleValue{ - HTTP: &v1beta1.HTTPIngressRuleValue{ - Paths: []v1beta1.HTTPIngressPath{ - { + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{ + Paths: []v1beta1.HTTPIngressPath{ + { Path: "/path", - Backend: v1beta1.IngressBackend{ + Backend: v1beta1.IngressBackend{ ServiceName: "service", ServicePort: intstr.FromInt(80), - }, - }, }, }, }, }, }, }, - } + }, + }, + } if test.ingressRuleType != "" { ingress.ObjectMeta.Annotations = map[string]string{ @@ -386,54 +388,54 @@ func TestRuleType(t *testing.T) { } service := &v1.Service{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: v1.ObjectMeta{ Name: "service", - UID: "1", - }, - Spec: v1.ServiceSpec{ - ClusterIP: "10.0.0.1", - Ports: []v1.ServicePort{ - { - Name: "http", - Port: 801, - }, + UID: "1", + }, + Spec: v1.ServiceSpec{ + ClusterIP: "10.0.0.1", + Ports: []v1.ServicePort{ + { + Name: "http", + Port: 801, }, }, - } + }, + } - watchChan := make(chan interface{}) - client := clientMock{ + watchChan := make(chan interface{}) + client := clientMock{ ingresses: []*v1beta1.Ingress{ingress}, services: []*v1.Service{service}, - watchChan: watchChan, - } - provider := Kubernetes{DisablePassHostHeaders: true} - actualConfig, err := provider.loadIngresses(client) - if err != nil { + watchChan: watchChan, + } + provider := Kubernetes{DisablePassHostHeaders: true} + actualConfig, err := provider.loadIngresses(client) + if err != nil { t.Fatalf("error loading ingresses: %+v", err) - } + } actual := actualConfig.Frontends - expected := map[string]*types.Frontend{ + expected := map[string]*types.Frontend{ "host/path": { Backend: "host/path", Priority: len("/path"), - Routes: map[string]types.Route{ + Routes: map[string]types.Route{ "/path": { Rule: fmt.Sprintf("%s:/path", test.frontendRuleType), - }, + }, "host": { Rule: "Host:host", - }, - }, }, - } + }, + }, + } if !reflect.DeepEqual(expected, actual) { expectedJSON, _ := json.Marshal(expected) - actualJSON, _ := json.Marshal(actual) - t.Fatalf("expected %+v, got %+v", string(expectedJSON), string(actualJSON)) - } + actualJSON, _ := json.Marshal(actual) + t.Fatalf("expected %+v, got %+v", string(expectedJSON), string(actualJSON)) + } }) } } @@ -1814,11 +1816,286 @@ func TestGetRuleTypeFromAnnotation(t *testing.T) { } } +func TestKubeAPIErrors(t *testing.T) { + ingresses := []*v1beta1.Ingress{{ + ObjectMeta: v1.ObjectMeta{ + Namespace: "testing", + }, + Spec: v1beta1.IngressSpec{ + Rules: []v1beta1.IngressRule{ + { + Host: "foo", + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{ + Paths: []v1beta1.HTTPIngressPath{ + { + Path: "/bar", + Backend: v1beta1.IngressBackend{ + ServiceName: "service1", + ServicePort: intstr.FromInt(80), + }, + }, + }, + }, + }, + }, + }, + }, + }} + + services := []*v1.Service{{ + ObjectMeta: v1.ObjectMeta{ + Name: "service1", + UID: "1", + Namespace: "testing", + }, + Spec: v1.ServiceSpec{ + ClusterIP: "10.0.0.1", + Ports: []v1.ServicePort{ + { + Port: 80, + }, + }, + }, + }} + + endpoints := []*v1.Endpoints{} + watchChan := make(chan interface{}) + apiErr := errors.New("failed kube api call") + + testCases := []struct { + desc string + apiServiceErr error + apiEndpointsErr error + }{ + { + desc: "failed service call", + apiServiceErr: apiErr, + }, + { + desc: "failed endpoints call", + apiEndpointsErr: apiErr, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + client := clientMock{ + ingresses: ingresses, + services: services, + endpoints: endpoints, + watchChan: watchChan, + apiServiceError: tc.apiServiceErr, + apiEndpointsError: tc.apiEndpointsErr, + } + + provider := Kubernetes{} + if _, err := provider.loadIngresses(client); err != apiErr { + t.Errorf("Got error %v, wanted error %v", err, apiErr) + } + }) + } +} + +func TestMissingResources(t *testing.T) { + ingresses := []*v1beta1.Ingress{{ + ObjectMeta: v1.ObjectMeta{ + Namespace: "testing", + }, + Spec: v1beta1.IngressSpec{ + Rules: []v1beta1.IngressRule{ + { + Host: "fully_working", + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{ + Paths: []v1beta1.HTTPIngressPath{ + { + Backend: v1beta1.IngressBackend{ + ServiceName: "fully_working_service", + ServicePort: intstr.FromInt(80), + }, + }, + }, + }, + }, + }, + { + Host: "missing_service", + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{ + Paths: []v1beta1.HTTPIngressPath{ + { + Backend: v1beta1.IngressBackend{ + ServiceName: "missing_service_service", + ServicePort: intstr.FromInt(80), + }, + }, + }, + }, + }, + }, + { + Host: "missing_endpoints", + IngressRuleValue: v1beta1.IngressRuleValue{ + HTTP: &v1beta1.HTTPIngressRuleValue{ + Paths: []v1beta1.HTTPIngressPath{ + { + Backend: v1beta1.IngressBackend{ + ServiceName: "missing_endpoints_service", + ServicePort: intstr.FromInt(80), + }, + }, + }, + }, + }, + }, + }, + }, + }} + services := []*v1.Service{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "fully_working_service", + UID: "1", + Namespace: "testing", + }, + Spec: v1.ServiceSpec{ + ClusterIP: "10.0.0.1", + Ports: []v1.ServicePort{ + { + Port: 80, + }, + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "missing_endpoints_service", + UID: "3", + Namespace: "testing", + }, + Spec: v1.ServiceSpec{ + ClusterIP: "10.0.0.3", + Ports: []v1.ServicePort{ + { + Port: 80, + }, + }, + }, + }, + } + endpoints := []*v1.Endpoints{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "fully_working_service", + UID: "1", + Namespace: "testing", + }, + Subsets: []v1.EndpointSubset{ + { + Addresses: []v1.EndpointAddress{ + { + IP: "10.10.0.1", + }, + }, + Ports: []v1.EndpointPort{ + { + Port: 8080, + }, + }, + }, + }, + }, + } + + watchChan := make(chan interface{}) + client := clientMock{ + ingresses: ingresses, + services: services, + endpoints: endpoints, + watchChan: watchChan, + + // TODO: Update all tests to cope with "properExists == true" correctly and remove flag. + // See https://github.com/containous/traefik/issues/1307 + properExists: true, + } + provider := Kubernetes{} + actual, err := provider.loadIngresses(client) + if err != nil { + t.Fatalf("error %+v", err) + } + + expected := &types.Configuration{ + Backends: map[string]*types.Backend{ + "fully_working": { + Servers: map[string]types.Server{ + "http://10.10.0.1:8080": { + URL: "http://10.10.0.1:8080", + Weight: 1, + }, + }, + CircuitBreaker: nil, + LoadBalancer: &types.LoadBalancer{ + Method: "wrr", + Sticky: false, + }, + }, + "missing_service": { + Servers: map[string]types.Server{}, + LoadBalancer: &types.LoadBalancer{ + Method: "wrr", + Sticky: false, + }, + }, + "missing_endpoints": { + Servers: map[string]types.Server{}, + CircuitBreaker: nil, + LoadBalancer: &types.LoadBalancer{ + Method: "wrr", + Sticky: false, + }, + }, + }, + Frontends: map[string]*types.Frontend{ + "fully_working": { + Backend: "fully_working", + PassHostHeader: true, + Routes: map[string]types.Route{ + "fully_working": { + Rule: "Host:fully_working", + }, + }, + }, + "missing_endpoints": { + Backend: "missing_endpoints", + PassHostHeader: true, + Routes: map[string]types.Route{ + "missing_endpoints": { + Rule: "Host:missing_endpoints", + }, + }, + }, + }, + } + + if !reflect.DeepEqual(actual, expected) { + t.Fatalf("expected\n%v\ngot\n\n%v", spew.Sdump(expected), spew.Sdump(actual)) + } +} + type clientMock struct { ingresses []*v1beta1.Ingress services []*v1.Service endpoints []*v1.Endpoints watchChan chan interface{} + + apiServiceError error + apiEndpointsError error + + properExists bool } func (c clientMock) GetIngresses(namespaces k8s.Namespaces) []*v1beta1.Ingress { @@ -1833,20 +2110,33 @@ func (c clientMock) GetIngresses(namespaces k8s.Namespaces) []*v1beta1.Ingress { } func (c clientMock) GetService(namespace, name string) (*v1.Service, bool, error) { + if c.apiServiceError != nil { + return &v1.Service{}, false, c.apiServiceError + } + for _, service := range c.services { if service.Namespace == namespace && service.Name == name { return service, true, nil } } - return &v1.Service{}, true, nil + return &v1.Service{}, false, nil } func (c clientMock) GetEndpoints(namespace, name string) (*v1.Endpoints, bool, error) { + if c.apiEndpointsError != nil { + return &v1.Endpoints{}, false, c.apiEndpointsError + } + for _, endpoints := range c.endpoints { if endpoints.Namespace == namespace && endpoints.Name == name { return endpoints, true, nil } } + + if c.properExists { + return &v1.Endpoints{}, false, nil + } + return &v1.Endpoints{}, true, nil }