Add new ingressClass support to ingress provider

* add new ingressClass

* add doc

* lint

* adjust behavior to look for a class with a specific controller

* remove looking strange test ingressclass

* return nil rather than en empty object

* change documentation

* apply @kevinpollet suggestion

* change order of processIngress to be correct and adjust tests

* review: clean.

* review: clean.

* Fix for review

Co-authored-by: Manuel Zapf <manuel@containo.us>
Co-authored-by: Fernandez Ludovic <ludovic@containo.us>
Co-authored-by: Michael <michael.matur@gmail.com>
This commit is contained in:
Daniel Tomcej 2020-07-15 10:18:03 -07:00 committed by GitHub
parent 1ef93fead7
commit cb6ec507e2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 265 additions and 22 deletions

View file

@ -258,6 +258,17 @@ Value of `kubernetes.io/ingress.class` annotation that identifies Ingress object
If the parameter is non-empty, only Ingresses containing an annotation with the same value are processed. If the parameter is non-empty, only Ingresses containing an annotation with the same value are processed.
Otherwise, Ingresses missing the annotation, having an empty value, or with the value `traefik` are processed. Otherwise, Ingresses missing the annotation, having an empty value, or with the value `traefik` are processed.
#### ingressClass on Kubernetes 1.18+
If you cluster is running kubernetes 1.18+,
you can also leverage the newly Introduced `IngressClass` resource to define which Ingress Objects to handle.
In that case, Traefik will look for an `IngressClass` in your cluster with the controller of *traefik.io/ingress-controller* inside the spec.
!!! note ""
Please note, the ingressClass configuration on the provider is not used then anymore.
Please see [this article](https://kubernetes.io/blog/2020/04/02/improvements-to-the-ingress-api-in-kubernetes-1.18/) for more information.
### `ingressEndpoint` ### `ingressEndpoint`
#### `hostname` #### `hostname`

View file

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"strconv"
"time" "time"
"github.com/containous/traefik/v2/pkg/log" "github.com/containous/traefik/v2/pkg/log"
@ -49,15 +50,18 @@ func (reh *resourceEventHandler) OnDelete(obj interface{}) {
type Client interface { type Client interface {
WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error) WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error)
GetIngresses() []*networkingv1beta1.Ingress GetIngresses() []*networkingv1beta1.Ingress
GetIngressClass() (*networkingv1beta1.IngressClass, error)
GetService(namespace, name string) (*corev1.Service, bool, error) GetService(namespace, name string) (*corev1.Service, bool, error)
GetSecret(namespace, name string) (*corev1.Secret, bool, error) GetSecret(namespace, name string) (*corev1.Secret, bool, error)
GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error)
UpdateIngressStatus(ing *networkingv1beta1.Ingress, ip, hostname string) error UpdateIngressStatus(ing *networkingv1beta1.Ingress, ip, hostname string) error
GetServerVersion() (major, minor int, err error)
} }
type clientWrapper struct { type clientWrapper struct {
clientset *kubernetes.Clientset clientset *kubernetes.Clientset
factories map[string]informers.SharedInformerFactory factories map[string]informers.SharedInformerFactory
clusterFactory informers.SharedInformerFactory
ingressLabelSelector labels.Selector ingressLabelSelector labels.Selector
isNamespaceAll bool isNamespaceAll bool
watchedNamespaces []string watchedNamespaces []string
@ -152,9 +156,27 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
} }
for _, ns := range namespaces { for _, ns := range namespaces {
for t, ok := range c.factories[ns].WaitForCacheSync(stopCh) { for typ, ok := range c.factories[ns].WaitForCacheSync(stopCh) {
if !ok { if !ok {
return nil, fmt.Errorf("timed out waiting for controller caches to sync %s in namespace %q", t.String(), ns) return nil, fmt.Errorf("timed out waiting for controller caches to sync %s in namespace %q", typ, ns)
}
}
}
// If the kubernetes cluster is v1.18+, we can use the new IngressClass objects
major, minor, err := c.GetServerVersion()
if err != nil {
return nil, err
}
if major >= 1 && minor >= 18 {
c.clusterFactory = informers.NewSharedInformerFactoryWithOptions(c.clientset, resyncPeriod)
c.clusterFactory.Networking().V1beta1().IngressClasses().Informer().AddEventHandler(eventHandler)
c.clusterFactory.Start(stopCh)
for typ, ok := range c.clusterFactory.WaitForCacheSync(stopCh) {
if !ok {
return nil, fmt.Errorf("timed out waiting for controller caches to sync %s", typ)
} }
} }
} }
@ -307,6 +329,25 @@ func (c *clientWrapper) GetSecret(namespace, name string) (*corev1.Secret, bool,
return secret, exist, err return secret, exist, err
} }
func (c *clientWrapper) GetIngressClass() (*networkingv1beta1.IngressClass, error) {
if c.clusterFactory == nil {
return nil, errors.New("failed to find ingressClass: factory not loaded")
}
ingressClasses, err := c.clusterFactory.Networking().V1beta1().IngressClasses().Lister().List(labels.Everything())
if err != nil {
return nil, err
}
for _, ic := range ingressClasses {
if ic.Spec.Controller == traefikDefaultIngressClassController {
return ic, err
}
}
return nil, nil
}
// lookupNamespace returns the lookup namespace key for the given namespace. // lookupNamespace returns the lookup namespace key for the given namespace.
// When listening on all namespaces, it returns the client-go identifier ("") // When listening on all namespaces, it returns the client-go identifier ("")
// for all-namespaces. Otherwise, it returns the given namespace. // for all-namespaces. Otherwise, it returns the given namespace.
@ -339,6 +380,26 @@ func (c *clientWrapper) newResourceEventHandler(events chan<- interface{}) cache
} }
} }
// GetServerVersion returns the cluster server version, or an error.
func (c *clientWrapper) GetServerVersion() (major, minor int, err error) {
version, err := c.clientset.Discovery().ServerVersion()
if err != nil {
return 0, 0, fmt.Errorf("could not determine cluster API version: %w", err)
}
major, err = strconv.Atoi(version.Major)
if err != nil {
return 0, 0, fmt.Errorf("could not determine cluster major API version: %w", err)
}
minor, err = strconv.Atoi(version.Minor)
if err != nil {
return 0, 0, fmt.Errorf("could not determine cluster minor API version: %w", err)
}
return major, minor, nil
}
// eventHandlerFunc will pass the obj on to the events channel or drop it. // eventHandlerFunc will pass the obj on to the events channel or drop it.
// This is so passing the events along won't block in the case of high volume. // This is so passing the events along won't block in the case of high volume.
// The events are only used for signaling anyway so dropping a few is ok. // The events are only used for signaling anyway so dropping a few is ok.

View file

@ -8,15 +8,20 @@ import (
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1" extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
"k8s.io/api/networking/v1beta1" "k8s.io/api/networking/v1beta1"
networkingv1beta1 "k8s.io/api/networking/v1beta1"
) )
var _ Client = (*clientMock)(nil) var _ Client = (*clientMock)(nil)
type clientMock struct { type clientMock struct {
ingresses []*v1beta1.Ingress ingresses []*v1beta1.Ingress
services []*corev1.Service services []*corev1.Service
secrets []*corev1.Secret secrets []*corev1.Secret
endpoints []*corev1.Endpoints endpoints []*corev1.Endpoints
ingressClass *networkingv1beta1.IngressClass
serverMajor int
serverMinor int
apiServiceError error apiServiceError error
apiSecretError error apiSecretError error
@ -26,8 +31,11 @@ type clientMock struct {
watchChan chan interface{} watchChan chan interface{}
} }
func newClientMock(paths ...string) clientMock { func newClientMock(major, minor int, paths ...string) clientMock {
var c clientMock c := clientMock{
serverMajor: major,
serverMinor: minor,
}
for _, path := range paths { for _, path := range paths {
yamlContent, err := ioutil.ReadFile(path) yamlContent, err := ioutil.ReadFile(path)
@ -52,6 +60,8 @@ func newClientMock(paths ...string) clientMock {
panic(err) panic(err)
} }
c.ingresses = append(c.ingresses, ing) c.ingresses = append(c.ingresses, ing)
case *networkingv1beta1.IngressClass:
c.ingressClass = o
default: default:
panic(fmt.Sprintf("Unknown runtime object %+v %T", o, o)) panic(fmt.Sprintf("Unknown runtime object %+v %T", o, o))
} }
@ -65,6 +75,10 @@ func (c clientMock) GetIngresses() []*v1beta1.Ingress {
return c.ingresses return c.ingresses
} }
func (c clientMock) GetServerVersion() (major, minor int, err error) {
return c.serverMajor, c.serverMinor, nil
}
func (c clientMock) GetService(namespace, name string) (*corev1.Service, bool, error) { func (c clientMock) GetService(namespace, name string) (*corev1.Service, bool, error) {
if c.apiServiceError != nil { if c.apiServiceError != nil {
return nil, false, c.apiServiceError return nil, false, c.apiServiceError
@ -105,6 +119,10 @@ func (c clientMock) GetSecret(namespace, name string) (*corev1.Secret, bool, err
return nil, false, nil return nil, false, nil
} }
func (c clientMock) GetIngressClass() (*networkingv1beta1.IngressClass, error) {
return c.ingressClass, nil
}
func (c clientMock) WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error) { func (c clientMock) WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error) {
return c.watchChan, nil return c.watchChan, nil
} }

View file

@ -0,0 +1,11 @@
kind: Endpoints
apiVersion: v1
metadata:
name: service1
namespace: testing
subsets:
- addresses:
- ip: 10.10.0.1
ports:
- port: 8080

View file

@ -0,0 +1,14 @@
kind: Ingress
apiVersion: networking.k8s.io/v1beta1
metadata:
name: ""
namespace: testing
spec:
ingressClassName: traefik-lb
rules:
- http:
paths:
- path: /bar
backend:
serviceName: service1
servicePort: 80

View file

@ -0,0 +1,6 @@
apiVersion: networking.k8s.io/v1beta1
kind: IngressClass
metadata:
name: traefik-lb
spec:
controller: traefik.io/ingress-controller

View file

@ -0,0 +1,10 @@
kind: Service
apiVersion: v1
metadata:
name: service1
namespace: testing
spec:
ports:
- port: 80
clusterIp: 10.0.0.1

View file

@ -0,0 +1,11 @@
kind: Endpoints
apiVersion: v1
metadata:
name: service1
namespace: testing
subsets:
- addresses:
- ip: 10.10.0.1
ports:
- port: 8080

View file

@ -0,0 +1,14 @@
kind: Ingress
apiVersion: networking.k8s.io/v1beta1
metadata:
name: ""
namespace: testing
spec:
ingressClassName: traefik-lb
rules:
- http:
paths:
- path: /bar
backend:
serviceName: service1
servicePort: 80

View file

@ -0,0 +1,10 @@
kind: Service
apiVersion: v1
metadata:
name: service1
namespace: testing
spec:
ports:
- port: 80
clusterIp: 10.0.0.1

View file

@ -21,14 +21,16 @@ import (
"github.com/mitchellh/hashstructure" "github.com/mitchellh/hashstructure"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/api/networking/v1beta1" "k8s.io/api/networking/v1beta1"
networkingv1beta1 "k8s.io/api/networking/v1beta1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
) )
const ( const (
annotationKubernetesIngressClass = "kubernetes.io/ingress.class" annotationKubernetesIngressClass = "kubernetes.io/ingress.class"
traefikDefaultIngressClass = "traefik" traefikDefaultIngressClass = "traefik"
defaultPathMatcher = "PathPrefix" traefikDefaultIngressClassController = "traefik.io/ingress-controller"
defaultPathMatcher = "PathPrefix"
) )
// Provider holds configurations of the provider. // Provider holds configurations of the provider.
@ -181,13 +183,36 @@ func (p *Provider) loadConfigurationFromIngresses(ctx context.Context, client Cl
TCP: &dynamic.TCPConfiguration{}, TCP: &dynamic.TCPConfiguration{},
} }
major, minor, err := client.GetServerVersion()
if err != nil {
log.FromContext(ctx).Errorf("Failed to get server version: %v", err)
return conf
}
var ingressClass *networkingv1beta1.IngressClass
if major >= 1 && minor >= 18 {
ic, err := client.GetIngressClass()
if err != nil {
log.FromContext(ctx).Errorf("Failed to find an ingress class: %v", err)
return conf
}
if ic == nil {
log.FromContext(ctx).Errorf("No ingress class for the traefik-controller in the cluster")
return conf
}
ingressClass = ic
}
ingresses := client.GetIngresses() ingresses := client.GetIngresses()
certConfigs := make(map[string]*tls.CertAndStores) certConfigs := make(map[string]*tls.CertAndStores)
for _, ingress := range ingresses { for _, ingress := range ingresses {
ctx = log.With(ctx, log.Str("ingress", ingress.Name), log.Str("namespace", ingress.Namespace)) ctx = log.With(ctx, log.Str("ingress", ingress.Name), log.Str("namespace", ingress.Namespace))
if !shouldProcessIngress(p.IngressClass, ingress.Annotations[annotationKubernetesIngressClass]) { if !p.shouldProcessIngress(p.IngressClass, ingress, ingressClass) {
continue continue
} }
@ -273,7 +298,7 @@ func (p *Provider) loadConfigurationFromIngresses(ctx context.Context, client Cl
} }
func (p *Provider) updateIngressStatus(ing *v1beta1.Ingress, k8sClient Client) error { func (p *Provider) updateIngressStatus(ing *v1beta1.Ingress, k8sClient Client) error {
// Only process if an EndpointIngress has been configured // Only process if an EndpointIngress has been configured.
if p.IngressEndpoint == nil { if p.IngressEndpoint == nil {
return nil return nil
} }
@ -311,6 +336,12 @@ func (p *Provider) updateIngressStatus(ing *v1beta1.Ingress, k8sClient Client) e
return k8sClient.UpdateIngressStatus(ing, service.Status.LoadBalancer.Ingress[0].IP, service.Status.LoadBalancer.Ingress[0].Hostname) return k8sClient.UpdateIngressStatus(ing, service.Status.LoadBalancer.Ingress[0].IP, service.Status.LoadBalancer.Ingress[0].Hostname)
} }
func (p *Provider) shouldProcessIngress(providerIngressClass string, ingress *networkingv1beta1.Ingress, ingressClass *networkingv1beta1.IngressClass) bool {
return ingressClass != nil && ingress.Spec.IngressClassName != nil && ingressClass.ObjectMeta.Name == *ingress.Spec.IngressClassName ||
providerIngressClass == ingress.Annotations[annotationKubernetesIngressClass] ||
len(providerIngressClass) == 0 && ingress.Annotations[annotationKubernetesIngressClass] == traefikDefaultIngressClass
}
func buildHostRule(host string) string { func buildHostRule(host string) string {
if strings.HasPrefix(host, "*.") { if strings.HasPrefix(host, "*.") {
return "HostRegexp(`" + strings.Replace(host, "*.", "{subdomain:[a-zA-Z0-9-]+}.", 1) + "`)" return "HostRegexp(`" + strings.Replace(host, "*.", "{subdomain:[a-zA-Z0-9-]+}.", 1) + "`)"
@ -319,11 +350,6 @@ func buildHostRule(host string) string {
return "Host(`" + host + "`)" return "Host(`" + host + "`)"
} }
func shouldProcessIngress(ingressClass, ingressClassAnnotation string) bool {
return ingressClass == ingressClassAnnotation ||
(len(ingressClass) == 0 && ingressClassAnnotation == traefikDefaultIngressClass)
}
func getCertificates(ctx context.Context, ingress *v1beta1.Ingress, k8sClient Client, tlsConfigs map[string]*tls.CertAndStores) error { func getCertificates(ctx context.Context, ingress *v1beta1.Ingress, k8sClient Client, tlsConfigs map[string]*tls.CertAndStores) error {
for _, t := range ingress.Spec.TLS { for _, t := range ingress.Spec.TLS {
if t.SecretName == "" { if t.SecretName == "" {
@ -552,7 +578,8 @@ func throttleEvents(ctx context.Context, throttleDuration time.Duration, pool *s
if throttleDuration == 0 { if throttleDuration == 0 {
return nil return nil
} }
// Create a buffered channel to hold the pending event (if we're delaying processing the event due to throttling)
// Create a buffered channel to hold the pending event (if we're delaying processing the event due to throttling).
eventsChanBuffered := make(chan interface{}, 1) eventsChanBuffered := make(chan interface{}, 1)
// Run a goroutine that reads events from eventChan and does a // Run a goroutine that reads events from eventChan and does a
@ -571,7 +598,7 @@ func throttleEvents(ctx context.Context, throttleDuration time.Duration, pool *s
// We already have an event in eventsChanBuffered, so we'll // We already have an event in eventsChanBuffered, so we'll
// do a refresh as soon as our throttle allows us to. It's fine // do a refresh as soon as our throttle allows us to. It's fine
// to drop the event and keep whatever's in the buffer -- we // to drop the event and keep whatever's in the buffer -- we
// don't do different things for different events // don't do different things for different events.
log.FromContext(ctx).Debugf("Dropping event kind %T due to throttling", nextEvent) log.FromContext(ctx).Debugf("Dropping event kind %T due to throttling", nextEvent)
} }
} }

View file

@ -26,6 +26,7 @@ func TestLoadConfigurationFromIngresses(t *testing.T) {
testCases := []struct { testCases := []struct {
desc string desc string
ingressClass string ingressClass string
serverMinor int
expected *dynamic.Configuration expected *dynamic.Configuration
}{ }{
{ {
@ -923,6 +924,46 @@ func TestLoadConfigurationFromIngresses(t *testing.T) {
}, },
}, },
}, },
{
desc: "v18 Ingress with ingressClass",
serverMinor: 18,
expected: &dynamic.Configuration{
TCP: &dynamic.TCPConfiguration{},
HTTP: &dynamic.HTTPConfiguration{
Middlewares: map[string]*dynamic.Middleware{},
Routers: map[string]*dynamic.Router{
"testing-bar": {
Rule: "PathPrefix(`/bar`)",
Service: "testing-service1-80",
},
},
Services: map[string]*dynamic.Service{
"testing-service1-80": {
LoadBalancer: &dynamic.ServersLoadBalancer{
PassHostHeader: Bool(true),
Servers: []dynamic.Server{
{
URL: "http://10.10.0.1:8080",
},
},
},
},
},
},
},
},
{
desc: "v18 Ingress with missing ingressClass",
serverMinor: 18,
expected: &dynamic.Configuration{
TCP: &dynamic.TCPConfiguration{},
HTTP: &dynamic.HTTPConfiguration{
Middlewares: map[string]*dynamic.Middleware{},
Routers: map[string]*dynamic.Router{},
Services: map[string]*dynamic.Service{},
},
},
},
} }
for _, test := range testCases { for _, test := range testCases {
@ -947,8 +988,17 @@ func TestLoadConfigurationFromIngresses(t *testing.T) {
if err == nil { if err == nil {
paths = append(paths, generateTestFilename("_secret", test.desc)) paths = append(paths, generateTestFilename("_secret", test.desc))
} }
_, err = os.Stat(generateTestFilename("_ingressclass", test.desc))
if err == nil {
paths = append(paths, generateTestFilename("_ingressclass", test.desc))
}
clientMock := newClientMock(paths...) serverMinor := 17
if test.serverMinor != 0 {
serverMinor = test.serverMinor
}
clientMock := newClientMock(1, serverMinor, paths...)
p := Provider{IngressClass: test.ingressClass} p := Provider{IngressClass: test.ingressClass}
conf := p.loadConfigurationFromIngresses(context.Background(), clientMock) conf := p.loadConfigurationFromIngresses(context.Background(), clientMock)

View file

@ -12,7 +12,7 @@ import (
// MustParseYaml parses a YAML to objects. // MustParseYaml parses a YAML to objects.
func MustParseYaml(content []byte) []runtime.Object { func MustParseYaml(content []byte) []runtime.Object {
acceptedK8sTypes := regexp.MustCompile(`^(Deployment|Endpoints|Service|Ingress|IngressRoute|IngressRouteTCP|IngressRouteUDP|Middleware|Secret|TLSOption|TLSStore|TraefikService)$`) acceptedK8sTypes := regexp.MustCompile(`^(Deployment|Endpoints|Service|Ingress|IngressRoute|IngressRouteTCP|IngressRouteUDP|Middleware|Secret|TLSOption|TLSStore|TraefikService|IngressClass)$`)
files := strings.Split(string(content), "---") files := strings.Split(string(content), "---")
retVal := make([]runtime.Object, 0, len(files)) retVal := make([]runtime.Object, 0, len(files))