From 21b8b2deb552e01d163e12ba2e587786573e7962 Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Mon, 16 Apr 2018 14:44:04 +0200 Subject: [PATCH] Support multi-port services. --- provider/kubernetes/builder_endpoint_test.go | 4 +- provider/kubernetes/kubernetes.go | 35 ++++--- provider/kubernetes/kubernetes_test.go | 100 ++++++++++++++++++- 3 files changed, 120 insertions(+), 19 deletions(-) diff --git a/provider/kubernetes/builder_endpoint_test.go b/provider/kubernetes/builder_endpoint_test.go index e1df23582..84de61c3c 100644 --- a/provider/kubernetes/builder_endpoint_test.go +++ b/provider/kubernetes/builder_endpoint_test.go @@ -47,11 +47,11 @@ func subset(opts ...func(*corev1.EndpointSubset)) func(*corev1.Endpoints) { func eAddresses(opts ...func(*corev1.EndpointAddress)) func(*corev1.EndpointSubset) { return func(subset *corev1.EndpointSubset) { - a := &corev1.EndpointAddress{} for _, opt := range opts { + a := &corev1.EndpointAddress{} opt(a) + subset.Addresses = append(subset.Addresses, *a) } - subset.Addresses = append(subset.Addresses, *a) } } diff --git a/provider/kubernetes/kubernetes.go b/provider/kubernetes/kubernetes.go index 41c944a8a..242b7e3f2 100644 --- a/provider/kubernetes/kubernetes.go +++ b/provider/kubernetes/kubernetes.go @@ -8,7 +8,6 @@ import ( "fmt" "os" "reflect" - "strconv" "strings" "text/template" "time" @@ -301,8 +300,13 @@ func (p *Provider) loadIngresses(k8sClient Client) (*types.Configuration, error) } for _, subset := range endpoints.Subsets { + endpointPort := endpointPortNumber(port, subset.Ports) + if endpointPort == 0 { + // endpoint port does not match service. + continue + } for _, address := range subset.Addresses { - url := protocol + "://" + address.IP + ":" + strconv.Itoa(endpointPortNumber(port, subset.Ports)) + url := fmt.Sprintf("%s://%s:%d", protocol, address.IP, endpointPort) name := url if address.TargetRef != nil && address.TargetRef.Name != "" { name = address.TargetRef.Name @@ -468,18 +472,23 @@ func getTLS(ingress *extensionsv1beta1.Ingress, k8sClient Client) ([]*tls.Config return tlsConfigs, nil } -func endpointPortNumber(servicePort corev1.ServicePort, endpointPorts []corev1.EndpointPort) int { - if len(endpointPorts) > 0 { - // name is optional if there is only one port - port := endpointPorts[0] - for _, endpointPort := range endpointPorts { - if servicePort.Name == endpointPort.Name { - port = endpointPort - } - } - return int(port.Port) +// endpointPortNumber returns the port to be used for this endpoint. It is zero +// if the endpoint does not match the given service port. +func endpointPortNumber(servicePort corev1.ServicePort, endpointPorts []corev1.EndpointPort) int32 { + // Is this reasonable to assume? + if len(endpointPorts) == 0 { + return servicePort.Port } - return int(servicePort.Port) + + for _, endpointPort := range endpointPorts { + // For matching endpoints, the port names must correspond, either by + // being empty or non-empty. Multi-port services mandate non-empty + // names and allow us to filter for the right addresses. + if servicePort.Name == endpointPort.Name { + return endpointPort.Port + } + } + return 0 } func equalPorts(servicePort corev1.ServicePort, ingressPort intstr.IntOrString) bool { diff --git a/provider/kubernetes/kubernetes_test.go b/provider/kubernetes/kubernetes_test.go index 74b11e20a..b86c795d2 100644 --- a/provider/kubernetes/kubernetes_test.go +++ b/provider/kubernetes/kubernetes_test.go @@ -470,7 +470,7 @@ retryexpression: IsNetworkError() && Attempts() <= 2 `), sSpec( clusterIP("10.0.0.3"), - sPorts(sPort(803, ""))), + sPorts(sPort(803, "http"))), ), buildService( sName("service4"), @@ -480,7 +480,7 @@ retryexpression: IsNetworkError() && Attempts() <= 2 sAnnotation(annotationKubernetesMaxConnAmount, "6"), sSpec( clusterIP("10.0.0.4"), - sPorts(sPort(804, ""))), + sPorts(sPort(804, "http"))), ), } @@ -502,10 +502,10 @@ retryexpression: IsNetworkError() && Attempts() <= 2 eUID("2"), subset( eAddresses(eAddress("10.15.0.1")), - ePorts(ePort(8080, "http"))), + ePorts(ePort(8080, ""))), subset( eAddresses(eAddress("10.15.0.2")), - ePorts(ePort(8080, "http"))), + ePorts(ePort(8080, ""))), ), buildEndpoint( eNamespace("testing"), @@ -1926,3 +1926,95 @@ func TestGetTLS(t *testing.T) { }) } } + +func TestMultiPortServices(t *testing.T) { + ingresses := []*extensionsv1beta1.Ingress{ + buildIngress( + iNamespace("testing"), + iRules( + iRule(iPaths( + onePath(iPath("/cheddar"), iBackend("service", intstr.FromString("cheddar"))), + onePath(iPath("/stilton"), iBackend("service", intstr.FromString("stilton"))), + )), + ), + ), + } + + services := []*corev1.Service{ + buildService( + sName("service"), + sNamespace("testing"), + sUID("1"), + sSpec( + clusterIP("10.0.0.1"), + sPorts(sPort(80, "cheddar")), + sPorts(sPort(81, "stilton")), + ), + ), + } + + endpoints := []*corev1.Endpoints{ + buildEndpoint( + eNamespace("testing"), + eName("service"), + eUID("1"), + subset( + eAddresses( + eAddress("10.10.0.1"), + eAddress("10.10.0.2"), + ), + ePorts(ePort(8080, "cheddar")), + ), + subset( + eAddresses( + eAddress("10.20.0.1"), + eAddress("10.20.0.2"), + ), + ePorts(ePort(8081, "stilton")), + ), + ), + } + + watchChan := make(chan interface{}) + client := clientMock{ + ingresses: ingresses, + services: services, + endpoints: endpoints, + watchChan: watchChan, + } + provider := Provider{} + + actual, err := provider.loadIngresses(client) + require.NoError(t, err, "error loading ingresses") + + expected := buildConfiguration( + backends( + backend("/cheddar", + lbMethod("wrr"), + servers( + server("http://10.10.0.1:8080", weight(1)), + server("http://10.10.0.2:8080", weight(1)), + ), + ), + backend("/stilton", + lbMethod("wrr"), + servers( + server("http://10.20.0.1:8081", weight(1)), + server("http://10.20.0.2:8081", weight(1)), + ), + ), + ), + frontends( + frontend("/cheddar", + passHostHeader(), + routes(route("/cheddar", "PathPrefix:/cheddar")), + ), + frontend("/stilton", + passHostHeader(), + routes(route("/stilton", "PathPrefix:/stilton")), + ), + ), + ) + + assert.Equal(t, expected, actual) +}