Support multi-port services.

This commit is contained in:
Timo Reimann 2018-04-16 14:44:04 +02:00 committed by Traefiker Bot
parent 6b82a77e36
commit 21b8b2deb5
3 changed files with 120 additions and 19 deletions

View file

@ -47,13 +47,13 @@ func subset(opts ...func(*corev1.EndpointSubset)) func(*corev1.Endpoints) {
func eAddresses(opts ...func(*corev1.EndpointAddress)) func(*corev1.EndpointSubset) { func eAddresses(opts ...func(*corev1.EndpointAddress)) func(*corev1.EndpointSubset) {
return func(subset *corev1.EndpointSubset) { return func(subset *corev1.EndpointSubset) {
a := &corev1.EndpointAddress{}
for _, opt := range opts { for _, opt := range opts {
a := &corev1.EndpointAddress{}
opt(a) opt(a)
}
subset.Addresses = append(subset.Addresses, *a) subset.Addresses = append(subset.Addresses, *a)
} }
} }
}
func eAddress(ip string) func(*corev1.EndpointAddress) { func eAddress(ip string) func(*corev1.EndpointAddress) {
return func(address *corev1.EndpointAddress) { return func(address *corev1.EndpointAddress) {

View file

@ -8,7 +8,6 @@ import (
"fmt" "fmt"
"os" "os"
"reflect" "reflect"
"strconv"
"strings" "strings"
"text/template" "text/template"
"time" "time"
@ -301,8 +300,13 @@ func (p *Provider) loadIngresses(k8sClient Client) (*types.Configuration, error)
} }
for _, subset := range endpoints.Subsets { for _, subset := range endpoints.Subsets {
endpointPort := endpointPortNumber(port, subset.Ports)
if endpointPort == 0 {
// endpoint port does not match service.
continue
}
for _, address := range subset.Addresses { for _, address := range subset.Addresses {
url := protocol + "://" + address.IP + ":" + strconv.Itoa(endpointPortNumber(port, subset.Ports)) url := fmt.Sprintf("%s://%s:%d", protocol, address.IP, endpointPort)
name := url name := url
if address.TargetRef != nil && address.TargetRef.Name != "" { if address.TargetRef != nil && address.TargetRef.Name != "" {
name = address.TargetRef.Name name = address.TargetRef.Name
@ -468,18 +472,23 @@ func getTLS(ingress *extensionsv1beta1.Ingress, k8sClient Client) ([]*tls.Config
return tlsConfigs, nil return tlsConfigs, nil
} }
func endpointPortNumber(servicePort corev1.ServicePort, endpointPorts []corev1.EndpointPort) int { // endpointPortNumber returns the port to be used for this endpoint. It is zero
if len(endpointPorts) > 0 { // if the endpoint does not match the given service port.
// name is optional if there is only one port func endpointPortNumber(servicePort corev1.ServicePort, endpointPorts []corev1.EndpointPort) int32 {
port := endpointPorts[0] // Is this reasonable to assume?
if len(endpointPorts) == 0 {
return servicePort.Port
}
for _, endpointPort := range endpointPorts { for _, endpointPort := range endpointPorts {
// For matching endpoints, the port names must correspond, either by
// being empty or non-empty. Multi-port services mandate non-empty
// names and allow us to filter for the right addresses.
if servicePort.Name == endpointPort.Name { if servicePort.Name == endpointPort.Name {
port = endpointPort return endpointPort.Port
} }
} }
return int(port.Port) return 0
}
return int(servicePort.Port)
} }
func equalPorts(servicePort corev1.ServicePort, ingressPort intstr.IntOrString) bool { func equalPorts(servicePort corev1.ServicePort, ingressPort intstr.IntOrString) bool {

View file

@ -470,7 +470,7 @@ retryexpression: IsNetworkError() && Attempts() <= 2
`), `),
sSpec( sSpec(
clusterIP("10.0.0.3"), clusterIP("10.0.0.3"),
sPorts(sPort(803, ""))), sPorts(sPort(803, "http"))),
), ),
buildService( buildService(
sName("service4"), sName("service4"),
@ -480,7 +480,7 @@ retryexpression: IsNetworkError() && Attempts() <= 2
sAnnotation(annotationKubernetesMaxConnAmount, "6"), sAnnotation(annotationKubernetesMaxConnAmount, "6"),
sSpec( sSpec(
clusterIP("10.0.0.4"), clusterIP("10.0.0.4"),
sPorts(sPort(804, ""))), sPorts(sPort(804, "http"))),
), ),
} }
@ -502,10 +502,10 @@ retryexpression: IsNetworkError() && Attempts() <= 2
eUID("2"), eUID("2"),
subset( subset(
eAddresses(eAddress("10.15.0.1")), eAddresses(eAddress("10.15.0.1")),
ePorts(ePort(8080, "http"))), ePorts(ePort(8080, ""))),
subset( subset(
eAddresses(eAddress("10.15.0.2")), eAddresses(eAddress("10.15.0.2")),
ePorts(ePort(8080, "http"))), ePorts(ePort(8080, ""))),
), ),
buildEndpoint( buildEndpoint(
eNamespace("testing"), eNamespace("testing"),
@ -1926,3 +1926,95 @@ func TestGetTLS(t *testing.T) {
}) })
} }
} }
func TestMultiPortServices(t *testing.T) {
ingresses := []*extensionsv1beta1.Ingress{
buildIngress(
iNamespace("testing"),
iRules(
iRule(iPaths(
onePath(iPath("/cheddar"), iBackend("service", intstr.FromString("cheddar"))),
onePath(iPath("/stilton"), iBackend("service", intstr.FromString("stilton"))),
)),
),
),
}
services := []*corev1.Service{
buildService(
sName("service"),
sNamespace("testing"),
sUID("1"),
sSpec(
clusterIP("10.0.0.1"),
sPorts(sPort(80, "cheddar")),
sPorts(sPort(81, "stilton")),
),
),
}
endpoints := []*corev1.Endpoints{
buildEndpoint(
eNamespace("testing"),
eName("service"),
eUID("1"),
subset(
eAddresses(
eAddress("10.10.0.1"),
eAddress("10.10.0.2"),
),
ePorts(ePort(8080, "cheddar")),
),
subset(
eAddresses(
eAddress("10.20.0.1"),
eAddress("10.20.0.2"),
),
ePorts(ePort(8081, "stilton")),
),
),
}
watchChan := make(chan interface{})
client := clientMock{
ingresses: ingresses,
services: services,
endpoints: endpoints,
watchChan: watchChan,
}
provider := Provider{}
actual, err := provider.loadIngresses(client)
require.NoError(t, err, "error loading ingresses")
expected := buildConfiguration(
backends(
backend("/cheddar",
lbMethod("wrr"),
servers(
server("http://10.10.0.1:8080", weight(1)),
server("http://10.10.0.2:8080", weight(1)),
),
),
backend("/stilton",
lbMethod("wrr"),
servers(
server("http://10.20.0.1:8081", weight(1)),
server("http://10.20.0.2:8081", weight(1)),
),
),
),
frontends(
frontend("/cheddar",
passHostHeader(),
routes(route("/cheddar", "PathPrefix:/cheddar")),
),
frontend("/stilton",
passHostHeader(),
routes(route("/stilton", "PathPrefix:/stilton")),
),
),
)
assert.Equal(t, expected, actual)
}