Abort Kubernetes Ingress update if Kubernetes API call fails (#1295)

* Abort Kubernetes Ingress update if Kubernetes API call fails

Currently if a Kubernetes API call fails we potentially remove a working service from Traefik. This changes it so if a Kubernetes API call fails we abort out of the ingress update and use the current working config. Github issue: #1240

Also added a test to cover when requested resources (services and endpoints) that the user has specified don’t exist.

* Specifically capturing the tc range as documented here: https://blog.golang.org/subtests

* Updating service names in the mock data to be more clear

* Updated expected data to match what currently happens in the loadIngress

* Adding a blank Servers to the expected output so we compare against that instead of nil.

* Replacing the JSON test output with spew for the TestMissingResources test to help ensure we have useful output incase of failures

* Adding a temporary fix to the GetEndoints mocked function so we can override the return value for if the endpoints exist.

After the 1.2 release the use of properExists should be removed and the GetEndpoints function should return false for the second value indicating the endpoint doesn’t exist. However at this time that would break a lot of the tests.

* Adding quick TODO line about removing the properExists property

* Link to issue 1307 re: properExists flag.
This commit is contained in:
Regner Blok-Andersen 2017-03-17 08:34:34 -07:00 committed by Emile Vauge
parent b99a919bb4
commit b02393915e
No known key found for this signature in database
GPG key ID: D808B4C167352E59
2 changed files with 348 additions and 53 deletions

View file

@ -198,8 +198,13 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur
}
service, exists, err := k8sClient.GetService(i.ObjectMeta.Namespace, pa.Backend.ServiceName)
if err != nil || !exists {
log.Warnf("Error retrieving service %s/%s: %v", i.ObjectMeta.Namespace, pa.Backend.ServiceName, err)
if err != nil {
log.Errorf("Error while retrieving service information from k8s API %s/%s: %v", service.ObjectMeta.Namespace, pa.Backend.ServiceName, err)
return nil, err
}
if !exists {
log.Errorf("Service not found for %s/%s", service.ObjectMeta.Namespace, pa.Backend.ServiceName)
delete(templateObjects.Frontends, r.Host+pa.Path)
continue
}
@ -232,18 +237,18 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur
}
} else {
endpoints, exists, err := k8sClient.GetEndpoints(service.ObjectMeta.Namespace, service.ObjectMeta.Name)
if err != nil {
log.Errorf("Error while retrieving endpoints from k8s API %s/%s: %v", service.ObjectMeta.Namespace, service.ObjectMeta.Name, err)
if err != nil {
log.Errorf("Error retrieving endpoints %s/%s: %v", service.ObjectMeta.Namespace, service.ObjectMeta.Name, err)
return nil, err
}
if !exists {
log.Errorf("Endpoints not found for %s/%s", service.ObjectMeta.Namespace, service.ObjectMeta.Name)
continue
}
if !exists {
log.Errorf("Service not found for %s/%s", service.ObjectMeta.Namespace, service.ObjectMeta.Name)
continue
}
if len(endpoints.Subsets) == 0 {
log.Warnf("Endpoints not found for %s/%s, falling back to Service ClusterIP", service.ObjectMeta.Namespace, service.ObjectMeta.Name)
log.Warnf("Service endpoints not found for %s/%s, falling back to Service ClusterIP", service.ObjectMeta.Namespace, service.ObjectMeta.Name)
templateObjects.Backends[r.Host+pa.Path].Servers[string(service.UID)] = types.Server{
URL: protocol + "://" + service.Spec.ClusterIP + ":" + strconv.Itoa(int(port.Port)),
Weight: 1,

View file

@ -2,6 +2,7 @@ package provider
import (
"encoding/json"
"errors"
"fmt"
"reflect"
"strings"
@ -9,6 +10,7 @@ import (
"github.com/containous/traefik/provider/k8s"
"github.com/containous/traefik/types"
"github.com/davecgh/go-spew/spew"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/pkg/util/intstr"
@ -339,8 +341,8 @@ func TestRuleType(t *testing.T) {
desc: "implicit default",
ingressRuleType: "",
frontendRuleType: ruleTypePathPrefix,
},
{
},
{
desc: "unknown ingress / explicit default",
ingressRuleType: "unknown",
frontendRuleType: ruleTypePathPrefix,
@ -349,7 +351,7 @@ func TestRuleType(t *testing.T) {
desc: "explicit ingress",
ingressRuleType: ruleTypePath,
frontendRuleType: ruleTypePath,
},
},
}
for _, test := range tests {
@ -357,27 +359,27 @@ func TestRuleType(t *testing.T) {
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
ingress := &v1beta1.Ingress{
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
{
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
{
Host: "host",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Path: "/path",
Backend: v1beta1.IngressBackend{
Backend: v1beta1.IngressBackend{
ServiceName: "service",
ServicePort: intstr.FromInt(80),
},
},
},
},
},
},
},
},
}
},
},
}
if test.ingressRuleType != "" {
ingress.ObjectMeta.Annotations = map[string]string{
@ -386,54 +388,54 @@ func TestRuleType(t *testing.T) {
}
service := &v1.Service{
ObjectMeta: v1.ObjectMeta{
ObjectMeta: v1.ObjectMeta{
Name: "service",
UID: "1",
},
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1",
Ports: []v1.ServicePort{
{
Name: "http",
Port: 801,
},
UID: "1",
},
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1",
Ports: []v1.ServicePort{
{
Name: "http",
Port: 801,
},
},
}
},
}
watchChan := make(chan interface{})
client := clientMock{
watchChan := make(chan interface{})
client := clientMock{
ingresses: []*v1beta1.Ingress{ingress},
services: []*v1.Service{service},
watchChan: watchChan,
}
provider := Kubernetes{DisablePassHostHeaders: true}
actualConfig, err := provider.loadIngresses(client)
if err != nil {
watchChan: watchChan,
}
provider := Kubernetes{DisablePassHostHeaders: true}
actualConfig, err := provider.loadIngresses(client)
if err != nil {
t.Fatalf("error loading ingresses: %+v", err)
}
}
actual := actualConfig.Frontends
expected := map[string]*types.Frontend{
expected := map[string]*types.Frontend{
"host/path": {
Backend: "host/path",
Priority: len("/path"),
Routes: map[string]types.Route{
Routes: map[string]types.Route{
"/path": {
Rule: fmt.Sprintf("%s:/path", test.frontendRuleType),
},
},
"host": {
Rule: "Host:host",
},
},
},
}
},
},
}
if !reflect.DeepEqual(expected, actual) {
expectedJSON, _ := json.Marshal(expected)
actualJSON, _ := json.Marshal(actual)
t.Fatalf("expected %+v, got %+v", string(expectedJSON), string(actualJSON))
}
actualJSON, _ := json.Marshal(actual)
t.Fatalf("expected %+v, got %+v", string(expectedJSON), string(actualJSON))
}
})
}
}
@ -1814,11 +1816,286 @@ func TestGetRuleTypeFromAnnotation(t *testing.T) {
}
}
func TestKubeAPIErrors(t *testing.T) {
ingresses := []*v1beta1.Ingress{{
ObjectMeta: v1.ObjectMeta{
Namespace: "testing",
},
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
{
Host: "foo",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Path: "/bar",
Backend: v1beta1.IngressBackend{
ServiceName: "service1",
ServicePort: intstr.FromInt(80),
},
},
},
},
},
},
},
},
}}
services := []*v1.Service{{
ObjectMeta: v1.ObjectMeta{
Name: "service1",
UID: "1",
Namespace: "testing",
},
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1",
Ports: []v1.ServicePort{
{
Port: 80,
},
},
},
}}
endpoints := []*v1.Endpoints{}
watchChan := make(chan interface{})
apiErr := errors.New("failed kube api call")
testCases := []struct {
desc string
apiServiceErr error
apiEndpointsErr error
}{
{
desc: "failed service call",
apiServiceErr: apiErr,
},
{
desc: "failed endpoints call",
apiEndpointsErr: apiErr,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
client := clientMock{
ingresses: ingresses,
services: services,
endpoints: endpoints,
watchChan: watchChan,
apiServiceError: tc.apiServiceErr,
apiEndpointsError: tc.apiEndpointsErr,
}
provider := Kubernetes{}
if _, err := provider.loadIngresses(client); err != apiErr {
t.Errorf("Got error %v, wanted error %v", err, apiErr)
}
})
}
}
func TestMissingResources(t *testing.T) {
ingresses := []*v1beta1.Ingress{{
ObjectMeta: v1.ObjectMeta{
Namespace: "testing",
},
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
{
Host: "fully_working",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Backend: v1beta1.IngressBackend{
ServiceName: "fully_working_service",
ServicePort: intstr.FromInt(80),
},
},
},
},
},
},
{
Host: "missing_service",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Backend: v1beta1.IngressBackend{
ServiceName: "missing_service_service",
ServicePort: intstr.FromInt(80),
},
},
},
},
},
},
{
Host: "missing_endpoints",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Backend: v1beta1.IngressBackend{
ServiceName: "missing_endpoints_service",
ServicePort: intstr.FromInt(80),
},
},
},
},
},
},
},
},
}}
services := []*v1.Service{
{
ObjectMeta: v1.ObjectMeta{
Name: "fully_working_service",
UID: "1",
Namespace: "testing",
},
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1",
Ports: []v1.ServicePort{
{
Port: 80,
},
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Name: "missing_endpoints_service",
UID: "3",
Namespace: "testing",
},
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.3",
Ports: []v1.ServicePort{
{
Port: 80,
},
},
},
},
}
endpoints := []*v1.Endpoints{
{
ObjectMeta: v1.ObjectMeta{
Name: "fully_working_service",
UID: "1",
Namespace: "testing",
},
Subsets: []v1.EndpointSubset{
{
Addresses: []v1.EndpointAddress{
{
IP: "10.10.0.1",
},
},
Ports: []v1.EndpointPort{
{
Port: 8080,
},
},
},
},
},
}
watchChan := make(chan interface{})
client := clientMock{
ingresses: ingresses,
services: services,
endpoints: endpoints,
watchChan: watchChan,
// TODO: Update all tests to cope with "properExists == true" correctly and remove flag.
// See https://github.com/containous/traefik/issues/1307
properExists: true,
}
provider := Kubernetes{}
actual, err := provider.loadIngresses(client)
if err != nil {
t.Fatalf("error %+v", err)
}
expected := &types.Configuration{
Backends: map[string]*types.Backend{
"fully_working": {
Servers: map[string]types.Server{
"http://10.10.0.1:8080": {
URL: "http://10.10.0.1:8080",
Weight: 1,
},
},
CircuitBreaker: nil,
LoadBalancer: &types.LoadBalancer{
Method: "wrr",
Sticky: false,
},
},
"missing_service": {
Servers: map[string]types.Server{},
LoadBalancer: &types.LoadBalancer{
Method: "wrr",
Sticky: false,
},
},
"missing_endpoints": {
Servers: map[string]types.Server{},
CircuitBreaker: nil,
LoadBalancer: &types.LoadBalancer{
Method: "wrr",
Sticky: false,
},
},
},
Frontends: map[string]*types.Frontend{
"fully_working": {
Backend: "fully_working",
PassHostHeader: true,
Routes: map[string]types.Route{
"fully_working": {
Rule: "Host:fully_working",
},
},
},
"missing_endpoints": {
Backend: "missing_endpoints",
PassHostHeader: true,
Routes: map[string]types.Route{
"missing_endpoints": {
Rule: "Host:missing_endpoints",
},
},
},
},
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("expected\n%v\ngot\n\n%v", spew.Sdump(expected), spew.Sdump(actual))
}
}
type clientMock struct {
ingresses []*v1beta1.Ingress
services []*v1.Service
endpoints []*v1.Endpoints
watchChan chan interface{}
apiServiceError error
apiEndpointsError error
properExists bool
}
func (c clientMock) GetIngresses(namespaces k8s.Namespaces) []*v1beta1.Ingress {
@ -1833,20 +2110,33 @@ func (c clientMock) GetIngresses(namespaces k8s.Namespaces) []*v1beta1.Ingress {
}
func (c clientMock) GetService(namespace, name string) (*v1.Service, bool, error) {
if c.apiServiceError != nil {
return &v1.Service{}, false, c.apiServiceError
}
for _, service := range c.services {
if service.Namespace == namespace && service.Name == name {
return service, true, nil
}
}
return &v1.Service{}, true, nil
return &v1.Service{}, false, nil
}
func (c clientMock) GetEndpoints(namespace, name string) (*v1.Endpoints, bool, error) {
if c.apiEndpointsError != nil {
return &v1.Endpoints{}, false, c.apiEndpointsError
}
for _, endpoints := range c.endpoints {
if endpoints.Namespace == namespace && endpoints.Name == name {
return endpoints, true, nil
}
}
if c.properExists {
return &v1.Endpoints{}, false, nil
}
return &v1.Endpoints{}, true, nil
}