Add support for backend protocol selection in HTTP and GRPC routes
This commit is contained in:
parent
9dc2155e63
commit
e222d5cb2f
9 changed files with 495 additions and 219 deletions
|
@ -213,6 +213,8 @@ func (s *K8sConformanceSuite) TestK8sGatewayAPIConformance() {
|
|||
features.SupportHTTPRoutePathRedirect,
|
||||
features.SupportHTTPRouteResponseHeaderModification,
|
||||
features.SupportTLSRoute,
|
||||
features.SupportHTTPRouteBackendProtocolH2C,
|
||||
features.SupportHTTPRouteBackendProtocolWebSocket,
|
||||
),
|
||||
})
|
||||
require.NoError(s.T(), err)
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
---
|
||||
kind: GatewayClass
|
||||
apiVersion: gateway.networking.k8s.io/v1
|
||||
metadata:
|
||||
name: my-gateway-class
|
||||
spec:
|
||||
controllerName: traefik.io/gateway-controller
|
||||
|
||||
---
|
||||
kind: Gateway
|
||||
apiVersion: gateway.networking.k8s.io/v1
|
||||
metadata:
|
||||
name: my-gateway
|
||||
namespace: default
|
||||
spec:
|
||||
gatewayClassName: my-gateway-class
|
||||
listeners: # Use GatewayClass defaults for listener definition.
|
||||
- name: http
|
||||
protocol: HTTP
|
||||
port: 80
|
||||
allowedRoutes:
|
||||
kinds:
|
||||
- kind: HTTPRoute
|
||||
group: gateway.networking.k8s.io
|
||||
namespaces:
|
||||
from: Same
|
||||
|
||||
---
|
||||
kind: HTTPRoute
|
||||
apiVersion: gateway.networking.k8s.io/v1
|
||||
metadata:
|
||||
name: http-multi-protocols
|
||||
namespace: default
|
||||
spec:
|
||||
parentRefs:
|
||||
- name: my-gateway
|
||||
kind: Gateway
|
||||
group: gateway.networking.k8s.io
|
||||
hostnames:
|
||||
- "foo.com"
|
||||
rules:
|
||||
- matches:
|
||||
- path:
|
||||
type: Exact
|
||||
value: /bar
|
||||
backendRefs:
|
||||
- name: whoami-h2c
|
||||
port: 80
|
||||
weight: 1
|
||||
kind: Service
|
||||
group: ""
|
||||
- name: whoami-ws
|
||||
port: 80
|
||||
weight: 1
|
||||
kind: Service
|
||||
group: ""
|
||||
- name: whoami-wss
|
||||
port: 80
|
||||
weight: 1
|
||||
kind: Service
|
||||
group: ""
|
|
@ -7,9 +7,11 @@ metadata:
|
|||
spec:
|
||||
ports:
|
||||
- name: web2
|
||||
protocol: TCP
|
||||
port: 8000
|
||||
targetPort: web2
|
||||
- name: web
|
||||
protocol: TCP
|
||||
port: 80
|
||||
targetPort: web
|
||||
selector:
|
||||
|
@ -48,9 +50,11 @@ metadata:
|
|||
spec:
|
||||
ports:
|
||||
- name: web2
|
||||
protocol: TCP
|
||||
port: 8000
|
||||
targetPort: web2
|
||||
- name: web
|
||||
protocol: TCP
|
||||
port: 80
|
||||
targetPort: web
|
||||
selector:
|
||||
|
@ -89,6 +93,7 @@ metadata:
|
|||
spec:
|
||||
ports:
|
||||
- name: web
|
||||
protocol: TCP
|
||||
port: 8080
|
||||
targetPort: web
|
||||
selector:
|
||||
|
@ -317,3 +322,105 @@ status:
|
|||
ingress:
|
||||
- hostname: foo.bar
|
||||
- ip: 1.2.3.4
|
||||
|
||||
---
|
||||
kind: EndpointSlice
|
||||
apiVersion: discovery.k8s.io/v1
|
||||
metadata:
|
||||
name: whoami-h2c
|
||||
namespace: default
|
||||
labels:
|
||||
kubernetes.io/service-name: whoami-h2c
|
||||
|
||||
addressType: IPv4
|
||||
ports:
|
||||
- name: h2c
|
||||
protocol: TCP
|
||||
port: 80
|
||||
endpoints:
|
||||
- addresses:
|
||||
- 10.10.0.13
|
||||
conditions:
|
||||
ready: true
|
||||
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: whoami-h2c
|
||||
namespace: default
|
||||
|
||||
spec:
|
||||
ports:
|
||||
- protocol: TCP
|
||||
port: 80
|
||||
name: h2c
|
||||
appProtocol: kubernetes.io/h2c
|
||||
|
||||
---
|
||||
kind: EndpointSlice
|
||||
apiVersion: discovery.k8s.io/v1
|
||||
metadata:
|
||||
name: whoami-ws
|
||||
namespace: default
|
||||
labels:
|
||||
kubernetes.io/service-name: whoami-ws
|
||||
|
||||
addressType: IPv4
|
||||
ports:
|
||||
- name: ws
|
||||
protocol: TCP
|
||||
port: 80
|
||||
endpoints:
|
||||
- addresses:
|
||||
- 10.10.0.14
|
||||
conditions:
|
||||
ready: true
|
||||
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: whoami-ws
|
||||
namespace: default
|
||||
|
||||
spec:
|
||||
ports:
|
||||
- protocol: TCP
|
||||
port: 80
|
||||
name: ws
|
||||
appProtocol: kubernetes.io/ws
|
||||
|
||||
---
|
||||
kind: EndpointSlice
|
||||
apiVersion: discovery.k8s.io/v1
|
||||
metadata:
|
||||
name: whoami-wss
|
||||
namespace: default
|
||||
labels:
|
||||
kubernetes.io/service-name: whoami-wss
|
||||
|
||||
addressType: IPv4
|
||||
ports:
|
||||
- name: wss
|
||||
protocol: TCP
|
||||
port: 80
|
||||
endpoints:
|
||||
- addresses:
|
||||
- 10.10.0.15
|
||||
conditions:
|
||||
ready: true
|
||||
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: whoami-wss
|
||||
namespace: default
|
||||
|
||||
spec:
|
||||
ports:
|
||||
- protocol: TCP
|
||||
port: 80
|
||||
name: wss
|
||||
appProtocol: kubernetes.io/wss
|
||||
|
|
|
@ -2,7 +2,6 @@ package gateway
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
|
@ -260,23 +259,16 @@ func (p *Provider) loadGRPCBackendRef(route *gatev1.GRPCRoute, backendRef gatev1
|
|||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonUnsupportedProtocol),
|
||||
Message: fmt.Sprintf("Cannot load GRPCBackendRef %s/%s/%s/%s port is required", group, kind, namespace, backendRef.Name),
|
||||
Message: fmt.Sprintf("Cannot load GRPCBackendRef %s/%s/%s/%s: port is required", group, kind, namespace, backendRef.Name),
|
||||
}
|
||||
}
|
||||
|
||||
portStr := strconv.FormatInt(int64(port), 10)
|
||||
serviceName = provider.Normalize(serviceName + "-" + portStr)
|
||||
|
||||
lb, err := p.loadGRPCServers(namespace, backendRef)
|
||||
if err != nil {
|
||||
return serviceName, nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonBackendNotFound),
|
||||
Message: fmt.Sprintf("Cannot load GRPCBackendRef %s/%s/%s/%s: %s", group, kind, namespace, backendRef.Name, err),
|
||||
}
|
||||
lb, errCondition := p.loadGRPCServers(namespace, route, backendRef)
|
||||
if errCondition != nil {
|
||||
return serviceName, nil, errCondition
|
||||
}
|
||||
|
||||
return serviceName, &dynamic.Service{LoadBalancer: lb}, nil
|
||||
|
@ -319,72 +311,49 @@ func (p *Provider) loadGRPCMiddlewares(conf *dynamic.Configuration, namespace, r
|
|||
return middlewareNames, nil
|
||||
}
|
||||
|
||||
func (p *Provider) loadGRPCServers(namespace string, backendRef gatev1.GRPCBackendRef) (*dynamic.ServersLoadBalancer, error) {
|
||||
if backendRef.Port == nil {
|
||||
return nil, errors.New("port is required for Kubernetes Service reference")
|
||||
}
|
||||
|
||||
service, exists, err := p.client.GetService(namespace, string(backendRef.Name))
|
||||
func (p *Provider) loadGRPCServers(namespace string, route *gatev1.GRPCRoute, backendRef gatev1.GRPCBackendRef) (*dynamic.ServersLoadBalancer, *metav1.Condition) {
|
||||
backendAddresses, svcPort, err := p.getBackendAddresses(namespace, backendRef.BackendRef)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting service: %w", err)
|
||||
}
|
||||
if !exists {
|
||||
return nil, errors.New("service not found")
|
||||
}
|
||||
|
||||
var svcPort *corev1.ServicePort
|
||||
for _, p := range service.Spec.Ports {
|
||||
if p.Port == int32(*backendRef.Port) {
|
||||
svcPort = &p
|
||||
break
|
||||
return nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonBackendNotFound),
|
||||
Message: fmt.Sprintf("Cannot load GRPCBackendRef %s/%s: %s", namespace, backendRef.Name, err),
|
||||
}
|
||||
}
|
||||
if svcPort == nil {
|
||||
return nil, fmt.Errorf("service port %d not found", *backendRef.Port)
|
||||
|
||||
if svcPort.Protocol != corev1.ProtocolTCP {
|
||||
return nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonUnsupportedProtocol),
|
||||
Message: fmt.Sprintf("Cannot load GRPCBackendRef %s/%s: only TCP protocol is supported", namespace, backendRef.Name),
|
||||
}
|
||||
}
|
||||
|
||||
endpointSlices, err := p.client.ListEndpointSlicesForService(namespace, string(backendRef.Name))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting endpointslices: %w", err)
|
||||
}
|
||||
if len(endpointSlices) == 0 {
|
||||
return nil, errors.New("endpointslices not found")
|
||||
if svcPort.AppProtocol != nil && *svcPort.AppProtocol != appProtocolH2C {
|
||||
return nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonUnsupportedProtocol),
|
||||
Message: fmt.Sprintf("Cannot load GRPCBackendRef %s/%s: only kubernetes.io/h2c appProtocol is supported", namespace, backendRef.Name),
|
||||
}
|
||||
}
|
||||
|
||||
lb := &dynamic.ServersLoadBalancer{}
|
||||
lb.SetDefaults()
|
||||
|
||||
addresses := map[string]struct{}{}
|
||||
for _, endpointSlice := range endpointSlices {
|
||||
var port int32
|
||||
for _, p := range endpointSlice.Ports {
|
||||
if svcPort.Name == *p.Name {
|
||||
port = *p.Port
|
||||
break
|
||||
}
|
||||
}
|
||||
if port == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, endpoint := range endpointSlice.Endpoints {
|
||||
if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, address := range endpoint.Addresses {
|
||||
if _, ok := addresses[address]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
addresses[address] = struct{}{}
|
||||
lb.Servers = append(lb.Servers, dynamic.Server{
|
||||
URL: fmt.Sprintf("h2c://%s", net.JoinHostPort(address, strconv.Itoa(int(port)))),
|
||||
})
|
||||
}
|
||||
}
|
||||
for _, ba := range backendAddresses {
|
||||
lb.Servers = append(lb.Servers, dynamic.Server{
|
||||
URL: fmt.Sprintf("h2c://%s", net.JoinHostPort(ba.Address, strconv.Itoa(int(ba.Port)))),
|
||||
})
|
||||
}
|
||||
|
||||
return lb, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -260,23 +260,16 @@ func (p *Provider) loadService(route *gatev1.HTTPRoute, backendRef gatev1.HTTPBa
|
|||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonUnsupportedProtocol),
|
||||
Message: fmt.Sprintf("Cannot load HTTPBackendRef %s/%s/%s/%s port is required", group, kind, namespace, backendRef.Name),
|
||||
Message: fmt.Sprintf("Cannot load HTTPBackendRef %s/%s/%s/%s: port is required", group, kind, namespace, backendRef.Name),
|
||||
}
|
||||
}
|
||||
|
||||
portStr := strconv.FormatInt(int64(port), 10)
|
||||
serviceName = provider.Normalize(serviceName + "-" + portStr)
|
||||
|
||||
lb, err := p.loadHTTPServers(namespace, backendRef)
|
||||
if err != nil {
|
||||
return serviceName, nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonBackendNotFound),
|
||||
Message: fmt.Sprintf("Cannot load HTTPBackendRef %s/%s/%s/%s: %s", group, kind, namespace, backendRef.Name, err),
|
||||
}
|
||||
lb, errCondition := p.loadHTTPServers(namespace, route, backendRef)
|
||||
if errCondition != nil {
|
||||
return serviceName, nil, errCondition
|
||||
}
|
||||
|
||||
return serviceName, &dynamic.Service{LoadBalancer: lb}, nil
|
||||
|
@ -372,74 +365,39 @@ func (p *Provider) loadHTTPRouteFilterExtensionRef(namespace string, extensionRe
|
|||
return filterFunc(string(extensionRef.Name), namespace)
|
||||
}
|
||||
|
||||
func (p *Provider) loadHTTPServers(namespace string, backendRef gatev1.HTTPBackendRef) (*dynamic.ServersLoadBalancer, error) {
|
||||
if backendRef.Port == nil {
|
||||
return nil, errors.New("port is required for Kubernetes Service reference")
|
||||
}
|
||||
|
||||
service, exists, err := p.client.GetService(namespace, string(backendRef.Name))
|
||||
func (p *Provider) loadHTTPServers(namespace string, route *gatev1.HTTPRoute, backendRef gatev1.HTTPBackendRef) (*dynamic.ServersLoadBalancer, *metav1.Condition) {
|
||||
backendAddresses, svcPort, err := p.getBackendAddresses(namespace, backendRef.BackendRef)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting service: %w", err)
|
||||
}
|
||||
if !exists {
|
||||
return nil, errors.New("service not found")
|
||||
}
|
||||
|
||||
var svcPort *corev1.ServicePort
|
||||
for _, p := range service.Spec.Ports {
|
||||
if p.Port == int32(*backendRef.Port) {
|
||||
svcPort = &p
|
||||
break
|
||||
return nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonBackendNotFound),
|
||||
Message: fmt.Sprintf("Cannot load HTTPBackendRef %s/%s: %s", namespace, backendRef.Name, err),
|
||||
}
|
||||
}
|
||||
if svcPort == nil {
|
||||
return nil, fmt.Errorf("service port %d not found", *backendRef.Port)
|
||||
}
|
||||
|
||||
endpointSlices, err := p.client.ListEndpointSlicesForService(namespace, string(backendRef.Name))
|
||||
protocol, err := getProtocol(svcPort)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting endpointslices: %w", err)
|
||||
}
|
||||
if len(endpointSlices) == 0 {
|
||||
return nil, errors.New("endpointslices not found")
|
||||
return nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonUnsupportedProtocol),
|
||||
Message: fmt.Sprintf("Cannot load HTTPBackendRef %s/%s: %s", namespace, backendRef.Name, err),
|
||||
}
|
||||
}
|
||||
|
||||
lb := &dynamic.ServersLoadBalancer{}
|
||||
lb.SetDefaults()
|
||||
|
||||
protocol := getProtocol(*svcPort)
|
||||
|
||||
addresses := map[string]struct{}{}
|
||||
for _, endpointSlice := range endpointSlices {
|
||||
var port int32
|
||||
for _, p := range endpointSlice.Ports {
|
||||
if svcPort.Name == *p.Name {
|
||||
port = *p.Port
|
||||
break
|
||||
}
|
||||
}
|
||||
if port == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, endpoint := range endpointSlice.Endpoints {
|
||||
if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, address := range endpoint.Addresses {
|
||||
if _, ok := addresses[address]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
addresses[address] = struct{}{}
|
||||
lb.Servers = append(lb.Servers, dynamic.Server{
|
||||
URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(address, strconv.Itoa(int(port)))),
|
||||
})
|
||||
}
|
||||
}
|
||||
for _, ba := range backendAddresses {
|
||||
lb.Servers = append(lb.Servers, dynamic.Server{
|
||||
URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(ba.Address, strconv.Itoa(int(ba.Port)))),
|
||||
})
|
||||
}
|
||||
|
||||
return lb, nil
|
||||
}
|
||||
|
||||
|
@ -702,13 +660,29 @@ func createURLRewrite(filter *gatev1.HTTPURLRewriteFilter, pathMatch gatev1.HTTP
|
|||
}, nil
|
||||
}
|
||||
|
||||
func getProtocol(portSpec corev1.ServicePort) string {
|
||||
protocol := "http"
|
||||
if portSpec.Port == 443 || strings.HasPrefix(portSpec.Name, "https") {
|
||||
protocol = "https"
|
||||
func getProtocol(portSpec corev1.ServicePort) (string, error) {
|
||||
if portSpec.Protocol != corev1.ProtocolTCP {
|
||||
return "", errors.New("only TCP protocol is supported")
|
||||
}
|
||||
|
||||
return protocol
|
||||
if portSpec.AppProtocol == nil {
|
||||
protocol := "http"
|
||||
if portSpec.Port == 443 || strings.HasPrefix(portSpec.Name, "https") {
|
||||
protocol = "https"
|
||||
}
|
||||
return protocol, nil
|
||||
}
|
||||
|
||||
switch ap := *portSpec.AppProtocol; ap {
|
||||
case appProtocolH2C:
|
||||
return "h2c", nil
|
||||
case appProtocolWS:
|
||||
return "http", nil
|
||||
case appProtocolWSS:
|
||||
return "https", nil
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported application protocol %s", ap)
|
||||
}
|
||||
}
|
||||
|
||||
func mergeHTTPConfiguration(from, to *dynamic.Configuration) {
|
||||
|
|
|
@ -35,7 +35,8 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
providerName = "kubernetesgateway"
|
||||
providerName = "kubernetesgateway"
|
||||
|
||||
controllerName = "traefik.io/gateway-controller"
|
||||
|
||||
groupCore = "core"
|
||||
|
@ -48,6 +49,10 @@ const (
|
|||
kindTCPRoute = "TCPRoute"
|
||||
kindTLSRoute = "TLSRoute"
|
||||
kindService = "Service"
|
||||
|
||||
appProtocolH2C = "kubernetes.io/h2c"
|
||||
appProtocolWS = "kubernetes.io/ws"
|
||||
appProtocolWSS = "kubernetes.io/wss"
|
||||
)
|
||||
|
||||
// Provider holds configurations of the provider.
|
||||
|
@ -854,6 +859,79 @@ func (p *Provider) allowedNamespaces(gatewayNamespace string, routeNamespaces *g
|
|||
return nil, fmt.Errorf("unsupported RouteSelectType: %q", *routeNamespaces.From)
|
||||
}
|
||||
|
||||
type backendAddress struct {
|
||||
Address string
|
||||
Port int32
|
||||
}
|
||||
|
||||
func (p *Provider) getBackendAddresses(namespace string, ref gatev1.BackendRef) ([]backendAddress, corev1.ServicePort, error) {
|
||||
if ref.Port == nil {
|
||||
return nil, corev1.ServicePort{}, errors.New("port is required for Kubernetes Service reference")
|
||||
}
|
||||
|
||||
service, exists, err := p.client.GetService(namespace, string(ref.Name))
|
||||
if err != nil {
|
||||
return nil, corev1.ServicePort{}, fmt.Errorf("getting service: %w", err)
|
||||
}
|
||||
if !exists {
|
||||
return nil, corev1.ServicePort{}, errors.New("service not found")
|
||||
}
|
||||
|
||||
var svcPort *corev1.ServicePort
|
||||
for _, p := range service.Spec.Ports {
|
||||
if p.Port == int32(*ref.Port) {
|
||||
svcPort = &p
|
||||
break
|
||||
}
|
||||
}
|
||||
if svcPort == nil {
|
||||
return nil, corev1.ServicePort{}, fmt.Errorf("service port %d not found", *ref.Port)
|
||||
}
|
||||
|
||||
endpointSlices, err := p.client.ListEndpointSlicesForService(namespace, string(ref.Name))
|
||||
if err != nil {
|
||||
return nil, corev1.ServicePort{}, fmt.Errorf("getting endpointslices: %w", err)
|
||||
}
|
||||
if len(endpointSlices) == 0 {
|
||||
return nil, corev1.ServicePort{}, errors.New("endpointslices not found")
|
||||
}
|
||||
|
||||
uniqAddresses := map[string]struct{}{}
|
||||
backendServers := make([]backendAddress, 0)
|
||||
for _, endpointSlice := range endpointSlices {
|
||||
var port int32
|
||||
for _, p := range endpointSlice.Ports {
|
||||
if svcPort.Name == *p.Name {
|
||||
port = *p.Port
|
||||
break
|
||||
}
|
||||
}
|
||||
if port == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, endpoint := range endpointSlice.Endpoints {
|
||||
if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, address := range endpoint.Addresses {
|
||||
if _, ok := uniqAddresses[address]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
uniqAddresses[address] = struct{}{}
|
||||
backendServers = append(backendServers, backendAddress{
|
||||
Address: address,
|
||||
Port: port,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return backendServers, *svcPort, nil
|
||||
}
|
||||
|
||||
func supportedRouteKinds(protocol gatev1.ProtocolType, experimentalChannel bool) ([]gatev1.RouteGroupKind, []metav1.Condition) {
|
||||
group := gatev1.Group(gatev1.GroupName)
|
||||
|
||||
|
|
|
@ -2452,6 +2452,104 @@ func TestLoadHTTPRoutes_backendExtensionRef(t *testing.T) {
|
|||
TLS: &dynamic.TLSConfiguration{},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "Simple HTTPRoute, with appProtocol service",
|
||||
paths: []string{"services.yml", "httproute/with_app_protocol_service.yml"},
|
||||
groupKindBackendFuncs: map[string]map[string]BuildBackendFunc{
|
||||
traefikv1alpha1.GroupName: {"TraefikService": func(name, namespace string) (string, *dynamic.Service, error) {
|
||||
// func should never be executed in case of cross-provider reference.
|
||||
return "", nil, errors.New("BOOM")
|
||||
}},
|
||||
},
|
||||
entryPoints: map[string]Entrypoint{"web": {
|
||||
Address: ":80",
|
||||
}},
|
||||
expected: &dynamic.Configuration{
|
||||
UDP: &dynamic.UDPConfiguration{
|
||||
Routers: map[string]*dynamic.UDPRouter{},
|
||||
Services: map[string]*dynamic.UDPService{},
|
||||
},
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
Routers: map[string]*dynamic.TCPRouter{},
|
||||
Middlewares: map[string]*dynamic.TCPMiddleware{},
|
||||
Services: map[string]*dynamic.TCPService{},
|
||||
ServersTransports: map[string]*dynamic.TCPServersTransport{},
|
||||
},
|
||||
HTTP: &dynamic.HTTPConfiguration{
|
||||
Routers: map[string]*dynamic.Router{
|
||||
"default-http-multi-protocols-my-gateway-web-0-1c0cf64bde37d9d0df06": {
|
||||
EntryPoints: []string{"web"},
|
||||
Service: "default-http-multi-protocols-my-gateway-web-0-1c0cf64bde37d9d0df06-wrr",
|
||||
Rule: "Host(`foo.com`) && Path(`/bar`)",
|
||||
Priority: 100008,
|
||||
RuleSyntax: "v3",
|
||||
},
|
||||
},
|
||||
Middlewares: map[string]*dynamic.Middleware{},
|
||||
Services: map[string]*dynamic.Service{
|
||||
"default-http-multi-protocols-my-gateway-web-0-1c0cf64bde37d9d0df06-wrr": {
|
||||
Weighted: &dynamic.WeightedRoundRobin{
|
||||
Services: []dynamic.WRRService{
|
||||
{
|
||||
Name: "default-whoami-h2c-80",
|
||||
Weight: ptr.To(1),
|
||||
},
|
||||
{
|
||||
Name: "default-whoami-ws-80",
|
||||
Weight: ptr.To(1),
|
||||
},
|
||||
{
|
||||
Name: "default-whoami-wss-80",
|
||||
Weight: ptr.To(1),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"default-whoami-h2c-80": {
|
||||
LoadBalancer: &dynamic.ServersLoadBalancer{
|
||||
Servers: []dynamic.Server{
|
||||
{
|
||||
URL: "h2c://10.10.0.13:80",
|
||||
},
|
||||
},
|
||||
PassHostHeader: ptr.To(true),
|
||||
ResponseForwarding: &dynamic.ResponseForwarding{
|
||||
FlushInterval: ptypes.Duration(100 * time.Millisecond),
|
||||
},
|
||||
},
|
||||
},
|
||||
"default-whoami-ws-80": {
|
||||
LoadBalancer: &dynamic.ServersLoadBalancer{
|
||||
Servers: []dynamic.Server{
|
||||
{
|
||||
URL: "http://10.10.0.14:80",
|
||||
},
|
||||
},
|
||||
PassHostHeader: ptr.To(true),
|
||||
ResponseForwarding: &dynamic.ResponseForwarding{
|
||||
FlushInterval: ptypes.Duration(100 * time.Millisecond),
|
||||
},
|
||||
},
|
||||
},
|
||||
"default-whoami-wss-80": {
|
||||
LoadBalancer: &dynamic.ServersLoadBalancer{
|
||||
Servers: []dynamic.Server{
|
||||
{
|
||||
URL: "https://10.10.0.15:80",
|
||||
},
|
||||
},
|
||||
PassHostHeader: ptr.To(true),
|
||||
ResponseForwarding: &dynamic.ResponseForwarding{
|
||||
FlushInterval: ptypes.Duration(100 * time.Millisecond),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
ServersTransports: map[string]*dynamic.ServersTransport{},
|
||||
},
|
||||
TLS: &dynamic.TLSConfiguration{},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
|
|
|
@ -2,7 +2,6 @@ package gateway
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
|
@ -252,87 +251,45 @@ func (p *Provider) loadTCPService(route *gatev1alpha2.TCPRoute, backendRef gatev
|
|||
portStr := strconv.FormatInt(int64(port), 10)
|
||||
serviceName = provider.Normalize(serviceName + "-" + portStr)
|
||||
|
||||
lb, err := p.loadTCPServers(namespace, backendRef)
|
||||
if err != nil {
|
||||
return serviceName, nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonBackendNotFound),
|
||||
Message: fmt.Sprintf("Cannot load TCPRoute BackendRef %s/%s/%s/%s: %s", group, kind, namespace, backendRef.Name, err),
|
||||
}
|
||||
lb, errCondition := p.loadTCPServers(namespace, route, backendRef)
|
||||
if errCondition != nil {
|
||||
return serviceName, nil, errCondition
|
||||
}
|
||||
|
||||
return serviceName, &dynamic.TCPService{LoadBalancer: lb}, nil
|
||||
}
|
||||
|
||||
func (p *Provider) loadTCPServers(namespace string, backendRef gatev1.BackendRef) (*dynamic.TCPServersLoadBalancer, error) {
|
||||
if backendRef.Port == nil {
|
||||
return nil, errors.New("port is required for Kubernetes Service reference")
|
||||
}
|
||||
|
||||
service, exists, err := p.client.GetService(namespace, string(backendRef.Name))
|
||||
func (p *Provider) loadTCPServers(namespace string, route *gatev1alpha2.TCPRoute, backendRef gatev1.BackendRef) (*dynamic.TCPServersLoadBalancer, *metav1.Condition) {
|
||||
backendAddresses, svcPort, err := p.getBackendAddresses(namespace, backendRef)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting service: %w", err)
|
||||
}
|
||||
if !exists {
|
||||
return nil, errors.New("service not found")
|
||||
}
|
||||
|
||||
var svcPort *corev1.ServicePort
|
||||
for _, p := range service.Spec.Ports {
|
||||
if p.Port == int32(*backendRef.Port) {
|
||||
svcPort = &p
|
||||
break
|
||||
return nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.GetGeneration(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonBackendNotFound),
|
||||
Message: fmt.Sprintf("Cannot load TCPRoute BackendRef %s/%s: %s", namespace, backendRef.Name, err),
|
||||
}
|
||||
}
|
||||
if svcPort == nil {
|
||||
return nil, fmt.Errorf("service port %d not found", *backendRef.Port)
|
||||
}
|
||||
|
||||
endpointSlices, err := p.client.ListEndpointSlicesForService(namespace, string(backendRef.Name))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting endpointslices: %w", err)
|
||||
}
|
||||
if len(endpointSlices) == 0 {
|
||||
return nil, errors.New("endpointslices not found")
|
||||
if svcPort.Protocol != corev1.ProtocolTCP {
|
||||
return nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.GetGeneration(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonUnsupportedProtocol),
|
||||
Message: fmt.Sprintf("Cannot load TCPRoute BackendRef %s/%s: only TCP protocol is supported", namespace, backendRef.Name),
|
||||
}
|
||||
}
|
||||
|
||||
lb := &dynamic.TCPServersLoadBalancer{}
|
||||
|
||||
addresses := map[string]struct{}{}
|
||||
for _, endpointSlice := range endpointSlices {
|
||||
var port int32
|
||||
for _, p := range endpointSlice.Ports {
|
||||
if svcPort.Name == *p.Name {
|
||||
port = *p.Port
|
||||
break
|
||||
}
|
||||
}
|
||||
if port == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, endpoint := range endpointSlice.Endpoints {
|
||||
if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, address := range endpoint.Addresses {
|
||||
if _, ok := addresses[address]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
addresses[address] = struct{}{}
|
||||
lb.Servers = append(lb.Servers, dynamic.TCPServer{
|
||||
// TODO determine whether the servers needs TLS, from the port?
|
||||
Address: net.JoinHostPort(address, strconv.Itoa(int(port))),
|
||||
})
|
||||
}
|
||||
}
|
||||
for _, ba := range backendAddresses {
|
||||
lb.Servers = append(lb.Servers, dynamic.TCPServer{
|
||||
Address: net.JoinHostPort(ba.Address, strconv.Itoa(int(ba.Port))),
|
||||
})
|
||||
}
|
||||
|
||||
return lb, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package gateway
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -10,6 +11,7 @@ import (
|
|||
"github.com/rs/zerolog/log"
|
||||
"github.com/traefik/traefik/v3/pkg/config/dynamic"
|
||||
"github.com/traefik/traefik/v3/pkg/provider"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
ktypes "k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/utils/ptr"
|
||||
|
@ -251,21 +253,49 @@ func (p *Provider) loadTLSService(route *gatev1alpha2.TLSRoute, backendRef gatev
|
|||
portStr := strconv.FormatInt(int64(port), 10)
|
||||
serviceName = provider.Normalize(serviceName + "-" + portStr)
|
||||
|
||||
lb, err := p.loadTCPServers(namespace, backendRef)
|
||||
if err != nil {
|
||||
return serviceName, nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonBackendNotFound),
|
||||
Message: fmt.Sprintf("Cannot load TLSRoute BackendRef %s/%s/%s/%s: %s", group, kind, namespace, backendRef.Name, err),
|
||||
}
|
||||
lb, errCondition := p.loadTLSServers(namespace, route, backendRef)
|
||||
if errCondition != nil {
|
||||
return serviceName, nil, errCondition
|
||||
}
|
||||
|
||||
return serviceName, &dynamic.TCPService{LoadBalancer: lb}, nil
|
||||
}
|
||||
|
||||
func (p *Provider) loadTLSServers(namespace string, route *gatev1alpha2.TLSRoute, backendRef gatev1.BackendRef) (*dynamic.TCPServersLoadBalancer, *metav1.Condition) {
|
||||
backendAddresses, svcPort, err := p.getBackendAddresses(namespace, backendRef)
|
||||
if err != nil {
|
||||
return nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.GetGeneration(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonBackendNotFound),
|
||||
Message: fmt.Sprintf("Cannot load TLSRoute BackendRef %s/%s: %s", namespace, backendRef.Name, err),
|
||||
}
|
||||
}
|
||||
|
||||
if svcPort.Protocol != corev1.ProtocolTCP {
|
||||
return nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.GetGeneration(),
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonUnsupportedProtocol),
|
||||
Message: fmt.Sprintf("Cannot load TLSRoute BackendRef %s/%s: only TCP protocol is supported", namespace, backendRef.Name),
|
||||
}
|
||||
}
|
||||
|
||||
lb := &dynamic.TCPServersLoadBalancer{}
|
||||
|
||||
for _, ba := range backendAddresses {
|
||||
lb.Servers = append(lb.Servers, dynamic.TCPServer{
|
||||
// TODO determine whether the servers needs TLS, from the port?
|
||||
Address: net.JoinHostPort(ba.Address, strconv.Itoa(int(ba.Port))),
|
||||
})
|
||||
}
|
||||
return lb, nil
|
||||
}
|
||||
|
||||
func hostSNIRule(hostnames []gatev1.Hostname) string {
|
||||
rules := make([]string, 0, len(hostnames))
|
||||
uniqHostnames := map[gatev1.Hostname]struct{}{}
|
||||
|
|
Loading…
Reference in a new issue