From be0845af027d974ad2d5a41ab843b2e3c191376a Mon Sep 17 00:00:00 2001 From: Romain Date: Fri, 20 Nov 2020 00:18:04 +0100 Subject: [PATCH] Apply labelSelector as a TweakListOptions for Kubernetes informers --- docs/content/providers/kubernetes-crd.md | 14 ++- docs/content/providers/kubernetes-ingress.md | 10 +- integration/fixtures/k8s/03-ingress.yml | 2 + integration/fixtures/k8s/03-ingressroute.yml | 2 + integration/fixtures/k8s/03-tlsoption.yml | 2 + integration/fixtures/k8s/03-tlsstore.yml | 2 + .../k8s/06-ingressroute-traefikservices.yml | 2 + .../fixtures/k8s_crd_label_selector.toml | 19 ++++ .../fixtures/k8s_ingress_label_selector.toml | 16 +++ integration/k8s_test.go | 22 ++++ .../testdata/rawdata-crd-label-selector.json | 65 +++++++++++ .../rawdata-ingress-label-selector.json | 106 ++++++++++++++++++ pkg/provider/kubernetes/crd/client.go | 52 +++------ pkg/provider/kubernetes/crd/client_test.go | 4 +- pkg/provider/kubernetes/crd/kubernetes.go | 18 +-- pkg/provider/kubernetes/ingress/client.go | 61 +++++----- pkg/provider/kubernetes/ingress/kubernetes.go | 18 +-- 17 files changed, 317 insertions(+), 98 deletions(-) create mode 100644 integration/fixtures/k8s_crd_label_selector.toml create mode 100644 integration/fixtures/k8s_ingress_label_selector.toml create mode 100644 integration/testdata/rawdata-crd-label-selector.json create mode 100644 integration/testdata/rawdata-ingress-label-selector.json diff --git a/docs/content/providers/kubernetes-crd.md b/docs/content/providers/kubernetes-crd.md index 5f3f06086..41a6c5b89 100644 --- a/docs/content/providers/kubernetes-crd.md +++ b/docs/content/providers/kubernetes-crd.md @@ -177,26 +177,32 @@ _Optional,Default: empty (process all resources)_ ```toml tab="File (TOML)" [providers.kubernetesCRD] - labelselector = "A and not B" + labelselector = "app=traefik" # ... ``` ```yaml tab="File (YAML)" providers: kubernetesCRD: - labelselector: "A and not B" + labelselector: "app=traefik" # ... ``` ```bash tab="CLI" ---providers.kubernetescrd.labelselector="A and not B" +--providers.kubernetescrd.labelselector="app=traefik" ``` By default, Traefik processes all resource objects in the configured namespaces. -A label selector can be defined to filter on specific resource objects only. +A label selector can be defined to filter on specific resource objects only, +this will apply only on Traefik [Custom Resources](../routing/providers/kubernetes-crd.md#custom-resource-definition-crd) +and has no effect on Kubernetes `Secrets`, `Endpoints` and `Services`. See [label-selectors](https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors) for details. +!!! warning + + As the LabelSelector is applied to all Traefik Custom Resources, they all must match the filter. + ### `ingressClass` _Optional, Default: empty_ diff --git a/docs/content/providers/kubernetes-ingress.md b/docs/content/providers/kubernetes-ingress.md index c2e1d5346..3e165389e 100644 --- a/docs/content/providers/kubernetes-ingress.md +++ b/docs/content/providers/kubernetes-ingress.md @@ -212,23 +212,23 @@ _Optional,Default: empty (process all Ingresses)_ ```toml tab="File (TOML)" [providers.kubernetesIngress] - labelSelector = "A and not B" + labelSelector = "app=traefik" # ... ``` ```yaml tab="File (YAML)" providers: kubernetesIngress: - labelselector: "A and not B" + labelselector: "app=traefik" # ... ``` ```bash tab="CLI" ---providers.kubernetesingress.labelselector="A and not B" +--providers.kubernetesingress.labelselector="app=traefik" ``` -By default, Traefik processes all Ingress objects in the configured namespaces. -A label selector can be defined to filter on specific Ingress objects only. +By default, Traefik processes all `Ingress` objects in the configured namespaces. +A label selector can be defined to filter on specific `Ingress` objects only. See [label-selectors](https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors) for details. diff --git a/integration/fixtures/k8s/03-ingress.yml b/integration/fixtures/k8s/03-ingress.yml index 077572b81..22dde97cc 100644 --- a/integration/fixtures/k8s/03-ingress.yml +++ b/integration/fixtures/k8s/03-ingress.yml @@ -3,6 +3,8 @@ kind: Ingress metadata: name: test.ingress namespace: default + labels: + app: traefik spec: rules: diff --git a/integration/fixtures/k8s/03-ingressroute.yml b/integration/fixtures/k8s/03-ingressroute.yml index 4c48254a0..c040988b5 100644 --- a/integration/fixtures/k8s/03-ingressroute.yml +++ b/integration/fixtures/k8s/03-ingressroute.yml @@ -3,6 +3,8 @@ kind: IngressRoute metadata: name: test.route namespace: default + labels: + app: traefik spec: entryPoints: diff --git a/integration/fixtures/k8s/03-tlsoption.yml b/integration/fixtures/k8s/03-tlsoption.yml index 5c00a3973..7c5df10fc 100644 --- a/integration/fixtures/k8s/03-tlsoption.yml +++ b/integration/fixtures/k8s/03-tlsoption.yml @@ -3,6 +3,8 @@ kind: TLSOption metadata: name: mytlsoption namespace: default + labels: + app: traefik spec: minVersion: VersionTLS12 diff --git a/integration/fixtures/k8s/03-tlsstore.yml b/integration/fixtures/k8s/03-tlsstore.yml index 4ff03107d..5dea5d0fd 100644 --- a/integration/fixtures/k8s/03-tlsstore.yml +++ b/integration/fixtures/k8s/03-tlsstore.yml @@ -3,6 +3,8 @@ kind: TLSStore metadata: name: mytlsstore namespace: default + labels: + app: traefik spec: defaultCertificate: diff --git a/integration/fixtures/k8s/06-ingressroute-traefikservices.yml b/integration/fixtures/k8s/06-ingressroute-traefikservices.yml index a8cee1bee..3e096d1a7 100644 --- a/integration/fixtures/k8s/06-ingressroute-traefikservices.yml +++ b/integration/fixtures/k8s/06-ingressroute-traefikservices.yml @@ -50,6 +50,8 @@ kind: IngressRoute metadata: name: api.route namespace: default + labels: + app: traefik spec: entryPoints: diff --git a/integration/fixtures/k8s_crd_label_selector.toml b/integration/fixtures/k8s_crd_label_selector.toml new file mode 100644 index 000000000..ca7e51cbd --- /dev/null +++ b/integration/fixtures/k8s_crd_label_selector.toml @@ -0,0 +1,19 @@ +[global] + checkNewVersion = false + sendAnonymousUsage = false + +[log] + level = "DEBUG" + +[api] + +[entryPoints] + [entryPoints.footcp] + address = ":8093" + [entryPoints.fooudp] + address = ":8090/udp" + [entryPoints.web] + address = ":8000" + +[providers.kubernetesCRD] + labelSelector = "app=traefik" diff --git a/integration/fixtures/k8s_ingress_label_selector.toml b/integration/fixtures/k8s_ingress_label_selector.toml new file mode 100644 index 000000000..86f411e98 --- /dev/null +++ b/integration/fixtures/k8s_ingress_label_selector.toml @@ -0,0 +1,16 @@ +[global] + checkNewVersion = false + sendAnonymousUsage = false + +[log] + level = "DEBUG" + +[api] + insecure = true + +[entryPoints] + [entryPoints.web] + address = ":8000" + +[providers.kubernetesIngress] + labelSelector = "app=traefik" diff --git a/integration/k8s_test.go b/integration/k8s_test.go index c06d83a0f..60bd25b36 100644 --- a/integration/k8s_test.go +++ b/integration/k8s_test.go @@ -74,6 +74,17 @@ func (s *K8sSuite) TestIngressConfiguration(c *check.C) { testConfiguration(c, "testdata/rawdata-ingress.json", "8080") } +func (s *K8sSuite) TestIngressLabelSelector(c *check.C) { + cmd, display := s.traefikCmd(withConfigFile("fixtures/k8s_ingress_label_selector.toml")) + defer display(c) + + err := cmd.Start() + c.Assert(err, checker.IsNil) + defer s.killCmd(cmd) + + testConfiguration(c, "testdata/rawdata-ingress-label-selector.json", "8080") +} + func (s *K8sSuite) TestCRDConfiguration(c *check.C) { cmd, display := s.traefikCmd(withConfigFile("fixtures/k8s_crd.toml")) defer display(c) @@ -85,6 +96,17 @@ func (s *K8sSuite) TestCRDConfiguration(c *check.C) { testConfiguration(c, "testdata/rawdata-crd.json", "8000") } +func (s *K8sSuite) TestCRDLabelSelector(c *check.C) { + cmd, display := s.traefikCmd(withConfigFile("fixtures/k8s_crd_label_selector.toml")) + defer display(c) + + err := cmd.Start() + c.Assert(err, checker.IsNil) + defer s.killCmd(cmd) + + testConfiguration(c, "testdata/rawdata-crd-label-selector.json", "8000") +} + func testConfiguration(c *check.C, path, apiPort string) { err := try.GetRequest("http://127.0.0.1:"+apiPort+"/api/entrypoints", 20*time.Second, try.BodyContains(`"name":"web"`)) c.Assert(err, checker.IsNil) diff --git a/integration/testdata/rawdata-crd-label-selector.json b/integration/testdata/rawdata-crd-label-selector.json new file mode 100644 index 000000000..d75df29a0 --- /dev/null +++ b/integration/testdata/rawdata-crd-label-selector.json @@ -0,0 +1,65 @@ +{ + "routers": { + "default-api-route-29f28a463fb5d5ba16d2@kubernetescrd": { + "entryPoints": [ + "web" + ], + "service": "api@internal", + "rule": "PathPrefix(`/api`)", + "status": "enabled", + "using": [ + "web" + ] + }, + "default-test-route-6b204d94623b3df4370c@kubernetescrd": { + "entryPoints": [ + "web" + ], + "service": "default-test-route-6b204d94623b3df4370c", + "rule": "Host(`foo.com`) \u0026\u0026 PathPrefix(`/bar`)", + "priority": 12, + "tls": { + "options": "default-mytlsoption" + }, + "status": "enabled", + "using": [ + "web" + ] + } + }, + "services": { + "api@internal": { + "status": "enabled", + "usedBy": [ + "default-api-route-29f28a463fb5d5ba16d2@kubernetescrd" + ] + }, + "dashboard@internal": { + "status": "enabled" + }, + "default-test-route-6b204d94623b3df4370c@kubernetescrd": { + "loadBalancer": { + "servers": [ + { + "url": "http://10.42.0.3:80" + }, + { + "url": "http://10.42.0.4:80" + } + ], + "passHostHeader": true + }, + "status": "enabled", + "usedBy": [ + "default-test-route-6b204d94623b3df4370c@kubernetescrd" + ], + "serverStatus": { + "http://10.42.0.3:80": "UP", + "http://10.42.0.4:80": "UP" + } + }, + "noop@internal": { + "status": "enabled" + } + } +} \ No newline at end of file diff --git a/integration/testdata/rawdata-ingress-label-selector.json b/integration/testdata/rawdata-ingress-label-selector.json new file mode 100644 index 000000000..66d3b095e --- /dev/null +++ b/integration/testdata/rawdata-ingress-label-selector.json @@ -0,0 +1,106 @@ +{ + "routers": { + "api@internal": { + "entryPoints": [ + "traefik" + ], + "service": "api@internal", + "rule": "PathPrefix(`/api`)", + "priority": 2147483646, + "status": "enabled", + "using": [ + "traefik" + ] + }, + "dashboard@internal": { + "entryPoints": [ + "traefik" + ], + "middlewares": [ + "dashboard_redirect@internal", + "dashboard_stripprefix@internal" + ], + "service": "dashboard@internal", + "rule": "PathPrefix(`/`)", + "priority": 2147483645, + "status": "enabled", + "using": [ + "traefik" + ] + }, + "test-ingress-default-whoami-test-whoami@kubernetes": { + "entryPoints": [ + "web" + ], + "service": "default-whoami-http", + "rule": "Host(`whoami.test`) \u0026\u0026 PathPrefix(`/whoami`)", + "status": "enabled", + "using": [ + "web" + ] + } + }, + "middlewares": { + "dashboard_redirect@internal": { + "redirectRegex": { + "regex": "^(http:\\/\\/(\\[[\\w:.]+\\]|[\\w\\._-]+)(:\\d+)?)\\/$", + "replacement": "${1}/dashboard/", + "permanent": true + }, + "status": "enabled", + "usedBy": [ + "dashboard@internal" + ] + }, + "dashboard_stripprefix@internal": { + "stripPrefix": { + "prefixes": [ + "/dashboard/", + "/dashboard" + ] + }, + "status": "enabled", + "usedBy": [ + "dashboard@internal" + ] + } + }, + "services": { + "api@internal": { + "status": "enabled", + "usedBy": [ + "api@internal" + ] + }, + "dashboard@internal": { + "status": "enabled", + "usedBy": [ + "dashboard@internal" + ] + }, + "default-whoami-http@kubernetes": { + "loadBalancer": { + "servers": [ + { + "url": "http://10.42.0.2:80" + }, + { + "url": "http://10.42.0.7:80" + } + ], + "passHostHeader": true + }, + "status": "enabled", + "usedBy": [ + "test-ingress-default-whoami-test-whoami@kubernetes" + ], + "serverStatus": { + "http://10.42.0.2:80": "UP", + "http://10.42.0.7:80": "UP" + } + }, + "noop@internal": { + "status": "enabled" + } + } +} \ No newline at end of file diff --git a/pkg/provider/kubernetes/crd/client.go b/pkg/provider/kubernetes/crd/client.go index 26f045d54..09bfc2a66 100644 --- a/pkg/provider/kubernetes/crd/client.go +++ b/pkg/provider/kubernetes/crd/client.go @@ -17,7 +17,6 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" ) @@ -68,7 +67,7 @@ type clientWrapper struct { factoriesKube map[string]informers.SharedInformerFactory factoriesSecret map[string]informers.SharedInformerFactory - labelSelector labels.Selector + labelSelector string isNamespaceAll bool watchedNamespaces []string @@ -149,20 +148,25 @@ func newExternalClusterClient(endpoint, token, caFilePath string) (*clientWrappe // WatchAll starts namespace-specific controllers for all relevant kinds. func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error) { eventCh := make(chan interface{}, 1) - eventHandler := c.newResourceEventHandler(eventCh) + eventHandler := &resourceEventHandler{ev: eventCh} if len(namespaces) == 0 { namespaces = []string{metav1.NamespaceAll} c.isNamespaceAll = true } + c.watchedNamespaces = namespaces notOwnedByHelm := func(opts *metav1.ListOptions) { opts.LabelSelector = "owner!=helm" } + matchesLabelSelector := func(opts *metav1.ListOptions) { + opts.LabelSelector = c.labelSelector + } + for _, ns := range namespaces { - factoryCrd := externalversions.NewSharedInformerFactoryWithOptions(c.csCrd, resyncPeriod, externalversions.WithNamespace(ns)) + factoryCrd := externalversions.NewSharedInformerFactoryWithOptions(c.csCrd, resyncPeriod, externalversions.WithNamespace(ns), externalversions.WithTweakListOptions(matchesLabelSelector)) factoryCrd.Traefik().V1alpha1().IngressRoutes().Informer().AddEventHandler(eventHandler) factoryCrd.Traefik().V1alpha1().Middlewares().Informer().AddEventHandler(eventHandler) factoryCrd.Traefik().V1alpha1().IngressRouteTCPs().Informer().AddEventHandler(eventHandler) @@ -172,7 +176,6 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (< factoryCrd.Traefik().V1alpha1().TraefikServices().Informer().AddEventHandler(eventHandler) factoryKube := informers.NewSharedInformerFactoryWithOptions(c.csKube, resyncPeriod, informers.WithNamespace(ns)) - factoryKube.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler) factoryKube.Core().V1().Services().Informer().AddEventHandler(eventHandler) factoryKube.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler) @@ -217,7 +220,7 @@ func (c *clientWrapper) GetIngressRoutes() []*v1alpha1.IngressRoute { var result []*v1alpha1.IngressRoute for ns, factory := range c.factoriesCrd { - ings, err := factory.Traefik().V1alpha1().IngressRoutes().Lister().List(c.labelSelector) + ings, err := factory.Traefik().V1alpha1().IngressRoutes().Lister().List(labels.Everything()) if err != nil { log.Errorf("Failed to list ingress routes in namespace %s: %v", ns, err) } @@ -231,7 +234,7 @@ func (c *clientWrapper) GetIngressRouteTCPs() []*v1alpha1.IngressRouteTCP { var result []*v1alpha1.IngressRouteTCP for ns, factory := range c.factoriesCrd { - ings, err := factory.Traefik().V1alpha1().IngressRouteTCPs().Lister().List(c.labelSelector) + ings, err := factory.Traefik().V1alpha1().IngressRouteTCPs().Lister().List(labels.Everything()) if err != nil { log.Errorf("Failed to list tcp ingress routes in namespace %s: %v", ns, err) } @@ -245,7 +248,7 @@ func (c *clientWrapper) GetIngressRouteUDPs() []*v1alpha1.IngressRouteUDP { var result []*v1alpha1.IngressRouteUDP for ns, factory := range c.factoriesCrd { - ings, err := factory.Traefik().V1alpha1().IngressRouteUDPs().Lister().List(c.labelSelector) + ings, err := factory.Traefik().V1alpha1().IngressRouteUDPs().Lister().List(labels.Everything()) if err != nil { log.Errorf("Failed to list udp ingress routes in namespace %s: %v", ns, err) } @@ -259,7 +262,7 @@ func (c *clientWrapper) GetMiddlewares() []*v1alpha1.Middleware { var result []*v1alpha1.Middleware for ns, factory := range c.factoriesCrd { - middlewares, err := factory.Traefik().V1alpha1().Middlewares().Lister().List(c.labelSelector) + middlewares, err := factory.Traefik().V1alpha1().Middlewares().Lister().List(labels.Everything()) if err != nil { log.Errorf("Failed to list middlewares in namespace %s: %v", ns, err) } @@ -285,7 +288,7 @@ func (c *clientWrapper) GetTraefikServices() []*v1alpha1.TraefikService { var result []*v1alpha1.TraefikService for ns, factory := range c.factoriesCrd { - ings, err := factory.Traefik().V1alpha1().TraefikServices().Lister().List(c.labelSelector) + ings, err := factory.Traefik().V1alpha1().TraefikServices().Lister().List(labels.Everything()) if err != nil { log.Errorf("Failed to list Traefik services in namespace %s: %v", ns, err) } @@ -300,7 +303,7 @@ func (c *clientWrapper) GetTLSOptions() []*v1alpha1.TLSOption { var result []*v1alpha1.TLSOption for ns, factory := range c.factoriesCrd { - options, err := factory.Traefik().V1alpha1().TLSOptions().Lister().List(c.labelSelector) + options, err := factory.Traefik().V1alpha1().TLSOptions().Lister().List(labels.Everything()) if err != nil { log.Errorf("Failed to list tls options in namespace %s: %v", ns, err) } @@ -315,7 +318,7 @@ func (c *clientWrapper) GetTLSStores() []*v1alpha1.TLSStore { var result []*v1alpha1.TLSStore for ns, factory := range c.factoriesCrd { - stores, err := factory.Traefik().V1alpha1().TLSStores().Lister().List(c.labelSelector) + stores, err := factory.Traefik().V1alpha1().TLSStores().Lister().List(labels.Everything()) if err != nil { log.Errorf("Failed to list tls stores in namespace %s: %v", ns, err) } @@ -371,31 +374,6 @@ func (c *clientWrapper) lookupNamespace(ns string) string { return ns } -func (c *clientWrapper) newResourceEventHandler(events chan<- interface{}) cache.ResourceEventHandler { - return &cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - // Ignore Ingresses that do not match our custom label selector. - switch v := obj.(type) { - case *v1alpha1.IngressRoute: - return c.labelSelector.Matches(labels.Set(v.GetLabels())) - case *v1alpha1.IngressRouteTCP: - return c.labelSelector.Matches(labels.Set(v.GetLabels())) - case *v1alpha1.TraefikService: - return c.labelSelector.Matches(labels.Set(v.GetLabels())) - case *v1alpha1.TLSOption: - return c.labelSelector.Matches(labels.Set(v.GetLabels())) - case *v1alpha1.TLSStore: - return c.labelSelector.Matches(labels.Set(v.GetLabels())) - case *v1alpha1.Middleware: - return c.labelSelector.Matches(labels.Set(v.GetLabels())) - default: - return true - } - }, - Handler: &resourceEventHandler{ev: events}, - } -} - // eventHandlerFunc will pass the obj on to the events channel or drop it. // This is so passing the events along won't block in the case of high volume. // The events are only used for signaling anyway so dropping a few is ok. diff --git a/pkg/provider/kubernetes/crd/client_test.go b/pkg/provider/kubernetes/crd/client_test.go index a4113b56f..42aa574ee 100644 --- a/pkg/provider/kubernetes/crd/client_test.go +++ b/pkg/provider/kubernetes/crd/client_test.go @@ -34,7 +34,9 @@ func TestClientIgnoresHelmOwnedSecrets(t *testing.T) { client := newClientImpl(kubeClient, crdClient) - eventCh, err := client.WatchAll(nil, nil) + stopCh := make(chan struct{}) + + eventCh, err := client.WatchAll(nil, stopCh) require.NoError(t, err) select { diff --git a/pkg/provider/kubernetes/crd/kubernetes.go b/pkg/provider/kubernetes/crd/kubernetes.go index d5b978a73..3d3678a83 100644 --- a/pkg/provider/kubernetes/crd/kubernetes.go +++ b/pkg/provider/kubernetes/crd/kubernetes.go @@ -49,12 +49,12 @@ type Provider struct { lastConfiguration safe.Safe } -func (p *Provider) newK8sClient(ctx context.Context, labelSelector string) (*clientWrapper, error) { - labelSel, err := labels.Parse(labelSelector) +func (p *Provider) newK8sClient(ctx context.Context) (*clientWrapper, error) { + _, err := labels.Parse(p.LabelSelector) if err != nil { - return nil, fmt.Errorf("invalid label selector: %q", labelSelector) + return nil, fmt.Errorf("invalid label selector: %q", p.LabelSelector) } - log.FromContext(ctx).Infof("label selector is: %q", labelSel) + log.FromContext(ctx).Infof("label selector is: %q", p.LabelSelector) withEndpoint := "" if p.Endpoint != "" { @@ -74,11 +74,12 @@ func (p *Provider) newK8sClient(ctx context.Context, labelSelector string) (*cli client, err = newExternalClusterClient(p.Endpoint, p.Token, p.CertAuthFilePath) } - if err == nil { - client.labelSelector = labelSel + if err != nil { + return nil, err } - return client, err + client.labelSelector = p.LabelSelector + return client, nil } // Init the provider. @@ -92,8 +93,7 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe. ctxLog := log.With(context.Background(), log.Str(log.ProviderName, providerName)) logger := log.FromContext(ctxLog) - logger.Debugf("Using label selector: %q", p.LabelSelector) - k8sClient, err := p.newK8sClient(ctxLog, p.LabelSelector) + k8sClient, err := p.newK8sClient(ctxLog) if err != nil { return err } diff --git a/pkg/provider/kubernetes/ingress/client.go b/pkg/provider/kubernetes/ingress/client.go index e54887e75..c40dd9a62 100644 --- a/pkg/provider/kubernetes/ingress/client.go +++ b/pkg/provider/kubernetes/ingress/client.go @@ -19,7 +19,6 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" ) @@ -62,8 +61,9 @@ type clientWrapper struct { clientset kubernetes.Interface factoriesKube map[string]informers.SharedInformerFactory factoriesSecret map[string]informers.SharedInformerFactory + factoriesIngress map[string]informers.SharedInformerFactory clusterFactory informers.SharedInformerFactory - ingressLabelSelector labels.Selector + ingressLabelSelector string isNamespaceAll bool watchedNamespaces []string } @@ -126,16 +126,17 @@ func createClientFromConfig(c *rest.Config) (*clientWrapper, error) { func newClientImpl(clientset kubernetes.Interface) *clientWrapper { return &clientWrapper{ - clientset: clientset, - factoriesKube: make(map[string]informers.SharedInformerFactory), - factoriesSecret: make(map[string]informers.SharedInformerFactory), + clientset: clientset, + factoriesSecret: make(map[string]informers.SharedInformerFactory), + factoriesIngress: make(map[string]informers.SharedInformerFactory), + factoriesKube: make(map[string]informers.SharedInformerFactory), } } // WatchAll starts namespace-specific controllers for all relevant kinds. func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error) { eventCh := make(chan interface{}, 1) - eventHandler := c.newResourceEventHandler(eventCh) + eventHandler := &resourceEventHandler{eventCh} if len(namespaces) == 0 { namespaces = []string{metav1.NamespaceAll} @@ -148,25 +149,38 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (< opts.LabelSelector = "owner!=helm" } + matchesLabelSelector := func(opts *metav1.ListOptions) { + opts.LabelSelector = c.ingressLabelSelector + } + for _, ns := range namespaces { + factoryIngress := informers.NewSharedInformerFactoryWithOptions(c.clientset, resyncPeriod, informers.WithNamespace(ns), informers.WithTweakListOptions(matchesLabelSelector)) + factoryIngress.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler) + c.factoriesIngress[ns] = factoryIngress + factoryKube := informers.NewSharedInformerFactoryWithOptions(c.clientset, resyncPeriod, informers.WithNamespace(ns)) - factoryKube.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler) factoryKube.Core().V1().Services().Informer().AddEventHandler(eventHandler) factoryKube.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler) + c.factoriesKube[ns] = factoryKube factorySecret := informers.NewSharedInformerFactoryWithOptions(c.clientset, resyncPeriod, informers.WithNamespace(ns), informers.WithTweakListOptions(notOwnedByHelm)) factorySecret.Core().V1().Secrets().Informer().AddEventHandler(eventHandler) - - c.factoriesKube[ns] = factoryKube c.factoriesSecret[ns] = factorySecret } for _, ns := range namespaces { + c.factoriesIngress[ns].Start(stopCh) c.factoriesKube[ns].Start(stopCh) c.factoriesSecret[ns].Start(stopCh) } for _, ns := range namespaces { + for typ, ok := range c.factoriesIngress[ns].WaitForCacheSync(stopCh) { + if !ok { + return nil, fmt.Errorf("timed out waiting for controller caches to sync %s in namespace %q", typ, ns) + } + } + for typ, ok := range c.factoriesKube[ns].WaitForCacheSync(stopCh) { if !ok { return nil, fmt.Errorf("timed out waiting for controller caches to sync %s in namespace %q", typ, ns) @@ -205,9 +219,9 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (< func (c *clientWrapper) GetIngresses() []*networkingv1beta1.Ingress { var results []*networkingv1beta1.Ingress - for ns, factory := range c.factoriesKube { + for ns, factory := range c.factoriesIngress { // extensions - ings, err := factory.Extensions().V1beta1().Ingresses().Lister().List(c.ingressLabelSelector) + ings, err := factory.Extensions().V1beta1().Ingresses().Lister().List(labels.Everything()) if err != nil { log.Errorf("Failed to list ingresses in namespace %s: %v", ns, err) } @@ -222,7 +236,7 @@ func (c *clientWrapper) GetIngresses() []*networkingv1beta1.Ingress { } // networking - list, err := factory.Networking().V1beta1().Ingresses().Lister().List(c.ingressLabelSelector) + list, err := factory.Networking().V1beta1().Ingresses().Lister().List(labels.Everything()) if err != nil { log.Errorf("Failed to list ingresses in namespace %s: %v", ns, err) } @@ -256,7 +270,7 @@ func (c *clientWrapper) UpdateIngressStatus(src *networkingv1beta1.Ingress, ip, return c.updateIngressStatusOld(src, ip, hostname) } - ing, err := c.factoriesKube[c.lookupNamespace(src.Namespace)].Networking().V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name) + ing, err := c.factoriesIngress[c.lookupNamespace(src.Namespace)].Networking().V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name) if err != nil { return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespace, src.Name, err) } @@ -285,7 +299,7 @@ func (c *clientWrapper) UpdateIngressStatus(src *networkingv1beta1.Ingress, ip, } func (c *clientWrapper) updateIngressStatusOld(src *networkingv1beta1.Ingress, ip, hostname string) error { - ing, err := c.factoriesKube[c.lookupNamespace(src.Namespace)].Extensions().V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name) + ing, err := c.factoriesIngress[c.lookupNamespace(src.Namespace)].Extensions().V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name) if err != nil { return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespace, src.Name, err) } @@ -378,25 +392,6 @@ func (c *clientWrapper) lookupNamespace(ns string) string { return ns } -func (c *clientWrapper) newResourceEventHandler(events chan<- interface{}) cache.ResourceEventHandler { - return &cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - // Ignore Ingresses that do not match our custom label selector. - switch v := obj.(type) { - case *extensionsv1beta1.Ingress: - lbls := labels.Set(v.GetLabels()) - return c.ingressLabelSelector.Matches(lbls) - case *networkingv1beta1.Ingress: - lbls := labels.Set(v.GetLabels()) - return c.ingressLabelSelector.Matches(lbls) - default: - return true - } - }, - Handler: &resourceEventHandler{ev: events}, - } -} - // GetServerVersion returns the cluster server version, or an error. func (c *clientWrapper) GetServerVersion() (*version.Version, error) { serverVersion, err := c.clientset.Discovery().ServerVersion() diff --git a/pkg/provider/kubernetes/ingress/kubernetes.go b/pkg/provider/kubernetes/ingress/kubernetes.go index 5173bbd8b..f5944edb1 100644 --- a/pkg/provider/kubernetes/ingress/kubernetes.go +++ b/pkg/provider/kubernetes/ingress/kubernetes.go @@ -53,15 +53,15 @@ type EndpointIngress struct { PublishedService string `description:"Published Kubernetes Service to copy status from." json:"publishedService,omitempty" toml:"publishedService,omitempty" yaml:"publishedService,omitempty"` } -func (p *Provider) newK8sClient(ctx context.Context, ingressLabelSelector string) (*clientWrapper, error) { - ingLabelSel, err := labels.Parse(ingressLabelSelector) +func (p *Provider) newK8sClient(ctx context.Context) (*clientWrapper, error) { + _, err := labels.Parse(p.LabelSelector) if err != nil { - return nil, fmt.Errorf("invalid ingress label selector: %q", ingressLabelSelector) + return nil, fmt.Errorf("invalid ingress label selector: %q", p.LabelSelector) } logger := log.FromContext(ctx) - logger.Infof("ingress label selector is: %q", ingLabelSel) + logger.Infof("ingress label selector is: %q", p.LabelSelector) withEndpoint := "" if p.Endpoint != "" { @@ -81,11 +81,12 @@ func (p *Provider) newK8sClient(ctx context.Context, ingressLabelSelector string cl, err = newExternalClusterClient(p.Endpoint, p.Token, p.CertAuthFilePath) } - if err == nil { - cl.ingressLabelSelector = ingLabelSel + if err != nil { + return nil, err } - return cl, err + cl.ingressLabelSelector = p.LabelSelector + return cl, nil } // Init the provider. @@ -99,8 +100,7 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe. ctxLog := log.With(context.Background(), log.Str(log.ProviderName, "kubernetes")) logger := log.FromContext(ctxLog) - logger.Debugf("Using Ingress label selector: %q", p.LabelSelector) - k8sClient, err := p.newK8sClient(ctxLog, p.LabelSelector) + k8sClient, err := p.newK8sClient(ctxLog) if err != nil { return err }