Apply labelSelector as a TweakListOptions for Kubernetes informers

This commit is contained in:
Romain 2020-11-20 00:18:04 +01:00 committed by GitHub
parent f83a57b3da
commit be0845af02
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 317 additions and 98 deletions

View file

@ -177,26 +177,32 @@ _Optional,Default: empty (process all resources)_
```toml tab="File (TOML)" ```toml tab="File (TOML)"
[providers.kubernetesCRD] [providers.kubernetesCRD]
labelselector = "A and not B" labelselector = "app=traefik"
# ... # ...
``` ```
```yaml tab="File (YAML)" ```yaml tab="File (YAML)"
providers: providers:
kubernetesCRD: kubernetesCRD:
labelselector: "A and not B" labelselector: "app=traefik"
# ... # ...
``` ```
```bash tab="CLI" ```bash tab="CLI"
--providers.kubernetescrd.labelselector="A and not B" --providers.kubernetescrd.labelselector="app=traefik"
``` ```
By default, Traefik processes all resource objects in the configured namespaces. By default, Traefik processes all resource objects in the configured namespaces.
A label selector can be defined to filter on specific resource objects only. A label selector can be defined to filter on specific resource objects only,
this will apply only on Traefik [Custom Resources](../routing/providers/kubernetes-crd.md#custom-resource-definition-crd)
and has no effect on Kubernetes `Secrets`, `Endpoints` and `Services`.
See [label-selectors](https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors) for details. See [label-selectors](https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors) for details.
!!! warning
As the LabelSelector is applied to all Traefik Custom Resources, they all must match the filter.
### `ingressClass` ### `ingressClass`
_Optional, Default: empty_ _Optional, Default: empty_

View file

@ -212,23 +212,23 @@ _Optional,Default: empty (process all Ingresses)_
```toml tab="File (TOML)" ```toml tab="File (TOML)"
[providers.kubernetesIngress] [providers.kubernetesIngress]
labelSelector = "A and not B" labelSelector = "app=traefik"
# ... # ...
``` ```
```yaml tab="File (YAML)" ```yaml tab="File (YAML)"
providers: providers:
kubernetesIngress: kubernetesIngress:
labelselector: "A and not B" labelselector: "app=traefik"
# ... # ...
``` ```
```bash tab="CLI" ```bash tab="CLI"
--providers.kubernetesingress.labelselector="A and not B" --providers.kubernetesingress.labelselector="app=traefik"
``` ```
By default, Traefik processes all Ingress objects in the configured namespaces. By default, Traefik processes all `Ingress` objects in the configured namespaces.
A label selector can be defined to filter on specific Ingress objects only. A label selector can be defined to filter on specific `Ingress` objects only.
See [label-selectors](https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors) for details. See [label-selectors](https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors) for details.

View file

@ -3,6 +3,8 @@ kind: Ingress
metadata: metadata:
name: test.ingress name: test.ingress
namespace: default namespace: default
labels:
app: traefik
spec: spec:
rules: rules:

View file

@ -3,6 +3,8 @@ kind: IngressRoute
metadata: metadata:
name: test.route name: test.route
namespace: default namespace: default
labels:
app: traefik
spec: spec:
entryPoints: entryPoints:

View file

@ -3,6 +3,8 @@ kind: TLSOption
metadata: metadata:
name: mytlsoption name: mytlsoption
namespace: default namespace: default
labels:
app: traefik
spec: spec:
minVersion: VersionTLS12 minVersion: VersionTLS12

View file

@ -3,6 +3,8 @@ kind: TLSStore
metadata: metadata:
name: mytlsstore name: mytlsstore
namespace: default namespace: default
labels:
app: traefik
spec: spec:
defaultCertificate: defaultCertificate:

View file

@ -50,6 +50,8 @@ kind: IngressRoute
metadata: metadata:
name: api.route name: api.route
namespace: default namespace: default
labels:
app: traefik
spec: spec:
entryPoints: entryPoints:

View file

@ -0,0 +1,19 @@
[global]
checkNewVersion = false
sendAnonymousUsage = false
[log]
level = "DEBUG"
[api]
[entryPoints]
[entryPoints.footcp]
address = ":8093"
[entryPoints.fooudp]
address = ":8090/udp"
[entryPoints.web]
address = ":8000"
[providers.kubernetesCRD]
labelSelector = "app=traefik"

View file

@ -0,0 +1,16 @@
[global]
checkNewVersion = false
sendAnonymousUsage = false
[log]
level = "DEBUG"
[api]
insecure = true
[entryPoints]
[entryPoints.web]
address = ":8000"
[providers.kubernetesIngress]
labelSelector = "app=traefik"

View file

@ -74,6 +74,17 @@ func (s *K8sSuite) TestIngressConfiguration(c *check.C) {
testConfiguration(c, "testdata/rawdata-ingress.json", "8080") testConfiguration(c, "testdata/rawdata-ingress.json", "8080")
} }
func (s *K8sSuite) TestIngressLabelSelector(c *check.C) {
cmd, display := s.traefikCmd(withConfigFile("fixtures/k8s_ingress_label_selector.toml"))
defer display(c)
err := cmd.Start()
c.Assert(err, checker.IsNil)
defer s.killCmd(cmd)
testConfiguration(c, "testdata/rawdata-ingress-label-selector.json", "8080")
}
func (s *K8sSuite) TestCRDConfiguration(c *check.C) { func (s *K8sSuite) TestCRDConfiguration(c *check.C) {
cmd, display := s.traefikCmd(withConfigFile("fixtures/k8s_crd.toml")) cmd, display := s.traefikCmd(withConfigFile("fixtures/k8s_crd.toml"))
defer display(c) defer display(c)
@ -85,6 +96,17 @@ func (s *K8sSuite) TestCRDConfiguration(c *check.C) {
testConfiguration(c, "testdata/rawdata-crd.json", "8000") testConfiguration(c, "testdata/rawdata-crd.json", "8000")
} }
func (s *K8sSuite) TestCRDLabelSelector(c *check.C) {
cmd, display := s.traefikCmd(withConfigFile("fixtures/k8s_crd_label_selector.toml"))
defer display(c)
err := cmd.Start()
c.Assert(err, checker.IsNil)
defer s.killCmd(cmd)
testConfiguration(c, "testdata/rawdata-crd-label-selector.json", "8000")
}
func testConfiguration(c *check.C, path, apiPort string) { func testConfiguration(c *check.C, path, apiPort string) {
err := try.GetRequest("http://127.0.0.1:"+apiPort+"/api/entrypoints", 20*time.Second, try.BodyContains(`"name":"web"`)) err := try.GetRequest("http://127.0.0.1:"+apiPort+"/api/entrypoints", 20*time.Second, try.BodyContains(`"name":"web"`))
c.Assert(err, checker.IsNil) c.Assert(err, checker.IsNil)

View file

@ -0,0 +1,65 @@
{
"routers": {
"default-api-route-29f28a463fb5d5ba16d2@kubernetescrd": {
"entryPoints": [
"web"
],
"service": "api@internal",
"rule": "PathPrefix(`/api`)",
"status": "enabled",
"using": [
"web"
]
},
"default-test-route-6b204d94623b3df4370c@kubernetescrd": {
"entryPoints": [
"web"
],
"service": "default-test-route-6b204d94623b3df4370c",
"rule": "Host(`foo.com`) \u0026\u0026 PathPrefix(`/bar`)",
"priority": 12,
"tls": {
"options": "default-mytlsoption"
},
"status": "enabled",
"using": [
"web"
]
}
},
"services": {
"api@internal": {
"status": "enabled",
"usedBy": [
"default-api-route-29f28a463fb5d5ba16d2@kubernetescrd"
]
},
"dashboard@internal": {
"status": "enabled"
},
"default-test-route-6b204d94623b3df4370c@kubernetescrd": {
"loadBalancer": {
"servers": [
{
"url": "http://10.42.0.3:80"
},
{
"url": "http://10.42.0.4:80"
}
],
"passHostHeader": true
},
"status": "enabled",
"usedBy": [
"default-test-route-6b204d94623b3df4370c@kubernetescrd"
],
"serverStatus": {
"http://10.42.0.3:80": "UP",
"http://10.42.0.4:80": "UP"
}
},
"noop@internal": {
"status": "enabled"
}
}
}

View file

@ -0,0 +1,106 @@
{
"routers": {
"api@internal": {
"entryPoints": [
"traefik"
],
"service": "api@internal",
"rule": "PathPrefix(`/api`)",
"priority": 2147483646,
"status": "enabled",
"using": [
"traefik"
]
},
"dashboard@internal": {
"entryPoints": [
"traefik"
],
"middlewares": [
"dashboard_redirect@internal",
"dashboard_stripprefix@internal"
],
"service": "dashboard@internal",
"rule": "PathPrefix(`/`)",
"priority": 2147483645,
"status": "enabled",
"using": [
"traefik"
]
},
"test-ingress-default-whoami-test-whoami@kubernetes": {
"entryPoints": [
"web"
],
"service": "default-whoami-http",
"rule": "Host(`whoami.test`) \u0026\u0026 PathPrefix(`/whoami`)",
"status": "enabled",
"using": [
"web"
]
}
},
"middlewares": {
"dashboard_redirect@internal": {
"redirectRegex": {
"regex": "^(http:\\/\\/(\\[[\\w:.]+\\]|[\\w\\._-]+)(:\\d+)?)\\/$",
"replacement": "${1}/dashboard/",
"permanent": true
},
"status": "enabled",
"usedBy": [
"dashboard@internal"
]
},
"dashboard_stripprefix@internal": {
"stripPrefix": {
"prefixes": [
"/dashboard/",
"/dashboard"
]
},
"status": "enabled",
"usedBy": [
"dashboard@internal"
]
}
},
"services": {
"api@internal": {
"status": "enabled",
"usedBy": [
"api@internal"
]
},
"dashboard@internal": {
"status": "enabled",
"usedBy": [
"dashboard@internal"
]
},
"default-whoami-http@kubernetes": {
"loadBalancer": {
"servers": [
{
"url": "http://10.42.0.2:80"
},
{
"url": "http://10.42.0.7:80"
}
],
"passHostHeader": true
},
"status": "enabled",
"usedBy": [
"test-ingress-default-whoami-test-whoami@kubernetes"
],
"serverStatus": {
"http://10.42.0.2:80": "UP",
"http://10.42.0.7:80": "UP"
}
},
"noop@internal": {
"status": "enabled"
}
}
}

View file

@ -17,7 +17,6 @@ import (
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
) )
@ -68,7 +67,7 @@ type clientWrapper struct {
factoriesKube map[string]informers.SharedInformerFactory factoriesKube map[string]informers.SharedInformerFactory
factoriesSecret map[string]informers.SharedInformerFactory factoriesSecret map[string]informers.SharedInformerFactory
labelSelector labels.Selector labelSelector string
isNamespaceAll bool isNamespaceAll bool
watchedNamespaces []string watchedNamespaces []string
@ -149,20 +148,25 @@ func newExternalClusterClient(endpoint, token, caFilePath string) (*clientWrappe
// WatchAll starts namespace-specific controllers for all relevant kinds. // WatchAll starts namespace-specific controllers for all relevant kinds.
func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error) { func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error) {
eventCh := make(chan interface{}, 1) eventCh := make(chan interface{}, 1)
eventHandler := c.newResourceEventHandler(eventCh) eventHandler := &resourceEventHandler{ev: eventCh}
if len(namespaces) == 0 { if len(namespaces) == 0 {
namespaces = []string{metav1.NamespaceAll} namespaces = []string{metav1.NamespaceAll}
c.isNamespaceAll = true c.isNamespaceAll = true
} }
c.watchedNamespaces = namespaces c.watchedNamespaces = namespaces
notOwnedByHelm := func(opts *metav1.ListOptions) { notOwnedByHelm := func(opts *metav1.ListOptions) {
opts.LabelSelector = "owner!=helm" opts.LabelSelector = "owner!=helm"
} }
matchesLabelSelector := func(opts *metav1.ListOptions) {
opts.LabelSelector = c.labelSelector
}
for _, ns := range namespaces { for _, ns := range namespaces {
factoryCrd := externalversions.NewSharedInformerFactoryWithOptions(c.csCrd, resyncPeriod, externalversions.WithNamespace(ns)) factoryCrd := externalversions.NewSharedInformerFactoryWithOptions(c.csCrd, resyncPeriod, externalversions.WithNamespace(ns), externalversions.WithTweakListOptions(matchesLabelSelector))
factoryCrd.Traefik().V1alpha1().IngressRoutes().Informer().AddEventHandler(eventHandler) factoryCrd.Traefik().V1alpha1().IngressRoutes().Informer().AddEventHandler(eventHandler)
factoryCrd.Traefik().V1alpha1().Middlewares().Informer().AddEventHandler(eventHandler) factoryCrd.Traefik().V1alpha1().Middlewares().Informer().AddEventHandler(eventHandler)
factoryCrd.Traefik().V1alpha1().IngressRouteTCPs().Informer().AddEventHandler(eventHandler) factoryCrd.Traefik().V1alpha1().IngressRouteTCPs().Informer().AddEventHandler(eventHandler)
@ -172,7 +176,6 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
factoryCrd.Traefik().V1alpha1().TraefikServices().Informer().AddEventHandler(eventHandler) factoryCrd.Traefik().V1alpha1().TraefikServices().Informer().AddEventHandler(eventHandler)
factoryKube := informers.NewSharedInformerFactoryWithOptions(c.csKube, resyncPeriod, informers.WithNamespace(ns)) factoryKube := informers.NewSharedInformerFactoryWithOptions(c.csKube, resyncPeriod, informers.WithNamespace(ns))
factoryKube.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler)
factoryKube.Core().V1().Services().Informer().AddEventHandler(eventHandler) factoryKube.Core().V1().Services().Informer().AddEventHandler(eventHandler)
factoryKube.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler) factoryKube.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler)
@ -217,7 +220,7 @@ func (c *clientWrapper) GetIngressRoutes() []*v1alpha1.IngressRoute {
var result []*v1alpha1.IngressRoute var result []*v1alpha1.IngressRoute
for ns, factory := range c.factoriesCrd { for ns, factory := range c.factoriesCrd {
ings, err := factory.Traefik().V1alpha1().IngressRoutes().Lister().List(c.labelSelector) ings, err := factory.Traefik().V1alpha1().IngressRoutes().Lister().List(labels.Everything())
if err != nil { if err != nil {
log.Errorf("Failed to list ingress routes in namespace %s: %v", ns, err) log.Errorf("Failed to list ingress routes in namespace %s: %v", ns, err)
} }
@ -231,7 +234,7 @@ func (c *clientWrapper) GetIngressRouteTCPs() []*v1alpha1.IngressRouteTCP {
var result []*v1alpha1.IngressRouteTCP var result []*v1alpha1.IngressRouteTCP
for ns, factory := range c.factoriesCrd { for ns, factory := range c.factoriesCrd {
ings, err := factory.Traefik().V1alpha1().IngressRouteTCPs().Lister().List(c.labelSelector) ings, err := factory.Traefik().V1alpha1().IngressRouteTCPs().Lister().List(labels.Everything())
if err != nil { if err != nil {
log.Errorf("Failed to list tcp ingress routes in namespace %s: %v", ns, err) log.Errorf("Failed to list tcp ingress routes in namespace %s: %v", ns, err)
} }
@ -245,7 +248,7 @@ func (c *clientWrapper) GetIngressRouteUDPs() []*v1alpha1.IngressRouteUDP {
var result []*v1alpha1.IngressRouteUDP var result []*v1alpha1.IngressRouteUDP
for ns, factory := range c.factoriesCrd { for ns, factory := range c.factoriesCrd {
ings, err := factory.Traefik().V1alpha1().IngressRouteUDPs().Lister().List(c.labelSelector) ings, err := factory.Traefik().V1alpha1().IngressRouteUDPs().Lister().List(labels.Everything())
if err != nil { if err != nil {
log.Errorf("Failed to list udp ingress routes in namespace %s: %v", ns, err) log.Errorf("Failed to list udp ingress routes in namespace %s: %v", ns, err)
} }
@ -259,7 +262,7 @@ func (c *clientWrapper) GetMiddlewares() []*v1alpha1.Middleware {
var result []*v1alpha1.Middleware var result []*v1alpha1.Middleware
for ns, factory := range c.factoriesCrd { for ns, factory := range c.factoriesCrd {
middlewares, err := factory.Traefik().V1alpha1().Middlewares().Lister().List(c.labelSelector) middlewares, err := factory.Traefik().V1alpha1().Middlewares().Lister().List(labels.Everything())
if err != nil { if err != nil {
log.Errorf("Failed to list middlewares in namespace %s: %v", ns, err) log.Errorf("Failed to list middlewares in namespace %s: %v", ns, err)
} }
@ -285,7 +288,7 @@ func (c *clientWrapper) GetTraefikServices() []*v1alpha1.TraefikService {
var result []*v1alpha1.TraefikService var result []*v1alpha1.TraefikService
for ns, factory := range c.factoriesCrd { for ns, factory := range c.factoriesCrd {
ings, err := factory.Traefik().V1alpha1().TraefikServices().Lister().List(c.labelSelector) ings, err := factory.Traefik().V1alpha1().TraefikServices().Lister().List(labels.Everything())
if err != nil { if err != nil {
log.Errorf("Failed to list Traefik services in namespace %s: %v", ns, err) log.Errorf("Failed to list Traefik services in namespace %s: %v", ns, err)
} }
@ -300,7 +303,7 @@ func (c *clientWrapper) GetTLSOptions() []*v1alpha1.TLSOption {
var result []*v1alpha1.TLSOption var result []*v1alpha1.TLSOption
for ns, factory := range c.factoriesCrd { for ns, factory := range c.factoriesCrd {
options, err := factory.Traefik().V1alpha1().TLSOptions().Lister().List(c.labelSelector) options, err := factory.Traefik().V1alpha1().TLSOptions().Lister().List(labels.Everything())
if err != nil { if err != nil {
log.Errorf("Failed to list tls options in namespace %s: %v", ns, err) log.Errorf("Failed to list tls options in namespace %s: %v", ns, err)
} }
@ -315,7 +318,7 @@ func (c *clientWrapper) GetTLSStores() []*v1alpha1.TLSStore {
var result []*v1alpha1.TLSStore var result []*v1alpha1.TLSStore
for ns, factory := range c.factoriesCrd { for ns, factory := range c.factoriesCrd {
stores, err := factory.Traefik().V1alpha1().TLSStores().Lister().List(c.labelSelector) stores, err := factory.Traefik().V1alpha1().TLSStores().Lister().List(labels.Everything())
if err != nil { if err != nil {
log.Errorf("Failed to list tls stores in namespace %s: %v", ns, err) log.Errorf("Failed to list tls stores in namespace %s: %v", ns, err)
} }
@ -371,31 +374,6 @@ func (c *clientWrapper) lookupNamespace(ns string) string {
return ns return ns
} }
func (c *clientWrapper) newResourceEventHandler(events chan<- interface{}) cache.ResourceEventHandler {
return &cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
// Ignore Ingresses that do not match our custom label selector.
switch v := obj.(type) {
case *v1alpha1.IngressRoute:
return c.labelSelector.Matches(labels.Set(v.GetLabels()))
case *v1alpha1.IngressRouteTCP:
return c.labelSelector.Matches(labels.Set(v.GetLabels()))
case *v1alpha1.TraefikService:
return c.labelSelector.Matches(labels.Set(v.GetLabels()))
case *v1alpha1.TLSOption:
return c.labelSelector.Matches(labels.Set(v.GetLabels()))
case *v1alpha1.TLSStore:
return c.labelSelector.Matches(labels.Set(v.GetLabels()))
case *v1alpha1.Middleware:
return c.labelSelector.Matches(labels.Set(v.GetLabels()))
default:
return true
}
},
Handler: &resourceEventHandler{ev: events},
}
}
// eventHandlerFunc will pass the obj on to the events channel or drop it. // eventHandlerFunc will pass the obj on to the events channel or drop it.
// This is so passing the events along won't block in the case of high volume. // This is so passing the events along won't block in the case of high volume.
// The events are only used for signaling anyway so dropping a few is ok. // The events are only used for signaling anyway so dropping a few is ok.

View file

@ -34,7 +34,9 @@ func TestClientIgnoresHelmOwnedSecrets(t *testing.T) {
client := newClientImpl(kubeClient, crdClient) client := newClientImpl(kubeClient, crdClient)
eventCh, err := client.WatchAll(nil, nil) stopCh := make(chan struct{})
eventCh, err := client.WatchAll(nil, stopCh)
require.NoError(t, err) require.NoError(t, err)
select { select {

View file

@ -49,12 +49,12 @@ type Provider struct {
lastConfiguration safe.Safe lastConfiguration safe.Safe
} }
func (p *Provider) newK8sClient(ctx context.Context, labelSelector string) (*clientWrapper, error) { func (p *Provider) newK8sClient(ctx context.Context) (*clientWrapper, error) {
labelSel, err := labels.Parse(labelSelector) _, err := labels.Parse(p.LabelSelector)
if err != nil { if err != nil {
return nil, fmt.Errorf("invalid label selector: %q", labelSelector) return nil, fmt.Errorf("invalid label selector: %q", p.LabelSelector)
} }
log.FromContext(ctx).Infof("label selector is: %q", labelSel) log.FromContext(ctx).Infof("label selector is: %q", p.LabelSelector)
withEndpoint := "" withEndpoint := ""
if p.Endpoint != "" { if p.Endpoint != "" {
@ -74,11 +74,12 @@ func (p *Provider) newK8sClient(ctx context.Context, labelSelector string) (*cli
client, err = newExternalClusterClient(p.Endpoint, p.Token, p.CertAuthFilePath) client, err = newExternalClusterClient(p.Endpoint, p.Token, p.CertAuthFilePath)
} }
if err == nil { if err != nil {
client.labelSelector = labelSel return nil, err
} }
return client, err client.labelSelector = p.LabelSelector
return client, nil
} }
// Init the provider. // Init the provider.
@ -92,8 +93,7 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
ctxLog := log.With(context.Background(), log.Str(log.ProviderName, providerName)) ctxLog := log.With(context.Background(), log.Str(log.ProviderName, providerName))
logger := log.FromContext(ctxLog) logger := log.FromContext(ctxLog)
logger.Debugf("Using label selector: %q", p.LabelSelector) k8sClient, err := p.newK8sClient(ctxLog)
k8sClient, err := p.newK8sClient(ctxLog, p.LabelSelector)
if err != nil { if err != nil {
return err return err
} }

View file

@ -19,7 +19,6 @@ import (
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/clientcmd"
) )
@ -62,8 +61,9 @@ type clientWrapper struct {
clientset kubernetes.Interface clientset kubernetes.Interface
factoriesKube map[string]informers.SharedInformerFactory factoriesKube map[string]informers.SharedInformerFactory
factoriesSecret map[string]informers.SharedInformerFactory factoriesSecret map[string]informers.SharedInformerFactory
factoriesIngress map[string]informers.SharedInformerFactory
clusterFactory informers.SharedInformerFactory clusterFactory informers.SharedInformerFactory
ingressLabelSelector labels.Selector ingressLabelSelector string
isNamespaceAll bool isNamespaceAll bool
watchedNamespaces []string watchedNamespaces []string
} }
@ -126,16 +126,17 @@ func createClientFromConfig(c *rest.Config) (*clientWrapper, error) {
func newClientImpl(clientset kubernetes.Interface) *clientWrapper { func newClientImpl(clientset kubernetes.Interface) *clientWrapper {
return &clientWrapper{ return &clientWrapper{
clientset: clientset, clientset: clientset,
factoriesKube: make(map[string]informers.SharedInformerFactory), factoriesSecret: make(map[string]informers.SharedInformerFactory),
factoriesSecret: make(map[string]informers.SharedInformerFactory), factoriesIngress: make(map[string]informers.SharedInformerFactory),
factoriesKube: make(map[string]informers.SharedInformerFactory),
} }
} }
// WatchAll starts namespace-specific controllers for all relevant kinds. // WatchAll starts namespace-specific controllers for all relevant kinds.
func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error) { func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error) {
eventCh := make(chan interface{}, 1) eventCh := make(chan interface{}, 1)
eventHandler := c.newResourceEventHandler(eventCh) eventHandler := &resourceEventHandler{eventCh}
if len(namespaces) == 0 { if len(namespaces) == 0 {
namespaces = []string{metav1.NamespaceAll} namespaces = []string{metav1.NamespaceAll}
@ -148,25 +149,38 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
opts.LabelSelector = "owner!=helm" opts.LabelSelector = "owner!=helm"
} }
matchesLabelSelector := func(opts *metav1.ListOptions) {
opts.LabelSelector = c.ingressLabelSelector
}
for _, ns := range namespaces { for _, ns := range namespaces {
factoryIngress := informers.NewSharedInformerFactoryWithOptions(c.clientset, resyncPeriod, informers.WithNamespace(ns), informers.WithTweakListOptions(matchesLabelSelector))
factoryIngress.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler)
c.factoriesIngress[ns] = factoryIngress
factoryKube := informers.NewSharedInformerFactoryWithOptions(c.clientset, resyncPeriod, informers.WithNamespace(ns)) factoryKube := informers.NewSharedInformerFactoryWithOptions(c.clientset, resyncPeriod, informers.WithNamespace(ns))
factoryKube.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler)
factoryKube.Core().V1().Services().Informer().AddEventHandler(eventHandler) factoryKube.Core().V1().Services().Informer().AddEventHandler(eventHandler)
factoryKube.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler) factoryKube.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler)
c.factoriesKube[ns] = factoryKube
factorySecret := informers.NewSharedInformerFactoryWithOptions(c.clientset, resyncPeriod, informers.WithNamespace(ns), informers.WithTweakListOptions(notOwnedByHelm)) factorySecret := informers.NewSharedInformerFactoryWithOptions(c.clientset, resyncPeriod, informers.WithNamespace(ns), informers.WithTweakListOptions(notOwnedByHelm))
factorySecret.Core().V1().Secrets().Informer().AddEventHandler(eventHandler) factorySecret.Core().V1().Secrets().Informer().AddEventHandler(eventHandler)
c.factoriesKube[ns] = factoryKube
c.factoriesSecret[ns] = factorySecret c.factoriesSecret[ns] = factorySecret
} }
for _, ns := range namespaces { for _, ns := range namespaces {
c.factoriesIngress[ns].Start(stopCh)
c.factoriesKube[ns].Start(stopCh) c.factoriesKube[ns].Start(stopCh)
c.factoriesSecret[ns].Start(stopCh) c.factoriesSecret[ns].Start(stopCh)
} }
for _, ns := range namespaces { for _, ns := range namespaces {
for typ, ok := range c.factoriesIngress[ns].WaitForCacheSync(stopCh) {
if !ok {
return nil, fmt.Errorf("timed out waiting for controller caches to sync %s in namespace %q", typ, ns)
}
}
for typ, ok := range c.factoriesKube[ns].WaitForCacheSync(stopCh) { for typ, ok := range c.factoriesKube[ns].WaitForCacheSync(stopCh) {
if !ok { if !ok {
return nil, fmt.Errorf("timed out waiting for controller caches to sync %s in namespace %q", typ, ns) return nil, fmt.Errorf("timed out waiting for controller caches to sync %s in namespace %q", typ, ns)
@ -205,9 +219,9 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (<
func (c *clientWrapper) GetIngresses() []*networkingv1beta1.Ingress { func (c *clientWrapper) GetIngresses() []*networkingv1beta1.Ingress {
var results []*networkingv1beta1.Ingress var results []*networkingv1beta1.Ingress
for ns, factory := range c.factoriesKube { for ns, factory := range c.factoriesIngress {
// extensions // extensions
ings, err := factory.Extensions().V1beta1().Ingresses().Lister().List(c.ingressLabelSelector) ings, err := factory.Extensions().V1beta1().Ingresses().Lister().List(labels.Everything())
if err != nil { if err != nil {
log.Errorf("Failed to list ingresses in namespace %s: %v", ns, err) log.Errorf("Failed to list ingresses in namespace %s: %v", ns, err)
} }
@ -222,7 +236,7 @@ func (c *clientWrapper) GetIngresses() []*networkingv1beta1.Ingress {
} }
// networking // networking
list, err := factory.Networking().V1beta1().Ingresses().Lister().List(c.ingressLabelSelector) list, err := factory.Networking().V1beta1().Ingresses().Lister().List(labels.Everything())
if err != nil { if err != nil {
log.Errorf("Failed to list ingresses in namespace %s: %v", ns, err) log.Errorf("Failed to list ingresses in namespace %s: %v", ns, err)
} }
@ -256,7 +270,7 @@ func (c *clientWrapper) UpdateIngressStatus(src *networkingv1beta1.Ingress, ip,
return c.updateIngressStatusOld(src, ip, hostname) return c.updateIngressStatusOld(src, ip, hostname)
} }
ing, err := c.factoriesKube[c.lookupNamespace(src.Namespace)].Networking().V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name) ing, err := c.factoriesIngress[c.lookupNamespace(src.Namespace)].Networking().V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name)
if err != nil { if err != nil {
return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespace, src.Name, err) return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespace, src.Name, err)
} }
@ -285,7 +299,7 @@ func (c *clientWrapper) UpdateIngressStatus(src *networkingv1beta1.Ingress, ip,
} }
func (c *clientWrapper) updateIngressStatusOld(src *networkingv1beta1.Ingress, ip, hostname string) error { func (c *clientWrapper) updateIngressStatusOld(src *networkingv1beta1.Ingress, ip, hostname string) error {
ing, err := c.factoriesKube[c.lookupNamespace(src.Namespace)].Extensions().V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name) ing, err := c.factoriesIngress[c.lookupNamespace(src.Namespace)].Extensions().V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name)
if err != nil { if err != nil {
return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespace, src.Name, err) return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespace, src.Name, err)
} }
@ -378,25 +392,6 @@ func (c *clientWrapper) lookupNamespace(ns string) string {
return ns return ns
} }
func (c *clientWrapper) newResourceEventHandler(events chan<- interface{}) cache.ResourceEventHandler {
return &cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
// Ignore Ingresses that do not match our custom label selector.
switch v := obj.(type) {
case *extensionsv1beta1.Ingress:
lbls := labels.Set(v.GetLabels())
return c.ingressLabelSelector.Matches(lbls)
case *networkingv1beta1.Ingress:
lbls := labels.Set(v.GetLabels())
return c.ingressLabelSelector.Matches(lbls)
default:
return true
}
},
Handler: &resourceEventHandler{ev: events},
}
}
// GetServerVersion returns the cluster server version, or an error. // GetServerVersion returns the cluster server version, or an error.
func (c *clientWrapper) GetServerVersion() (*version.Version, error) { func (c *clientWrapper) GetServerVersion() (*version.Version, error) {
serverVersion, err := c.clientset.Discovery().ServerVersion() serverVersion, err := c.clientset.Discovery().ServerVersion()

View file

@ -53,15 +53,15 @@ type EndpointIngress struct {
PublishedService string `description:"Published Kubernetes Service to copy status from." json:"publishedService,omitempty" toml:"publishedService,omitempty" yaml:"publishedService,omitempty"` PublishedService string `description:"Published Kubernetes Service to copy status from." json:"publishedService,omitempty" toml:"publishedService,omitempty" yaml:"publishedService,omitempty"`
} }
func (p *Provider) newK8sClient(ctx context.Context, ingressLabelSelector string) (*clientWrapper, error) { func (p *Provider) newK8sClient(ctx context.Context) (*clientWrapper, error) {
ingLabelSel, err := labels.Parse(ingressLabelSelector) _, err := labels.Parse(p.LabelSelector)
if err != nil { if err != nil {
return nil, fmt.Errorf("invalid ingress label selector: %q", ingressLabelSelector) return nil, fmt.Errorf("invalid ingress label selector: %q", p.LabelSelector)
} }
logger := log.FromContext(ctx) logger := log.FromContext(ctx)
logger.Infof("ingress label selector is: %q", ingLabelSel) logger.Infof("ingress label selector is: %q", p.LabelSelector)
withEndpoint := "" withEndpoint := ""
if p.Endpoint != "" { if p.Endpoint != "" {
@ -81,11 +81,12 @@ func (p *Provider) newK8sClient(ctx context.Context, ingressLabelSelector string
cl, err = newExternalClusterClient(p.Endpoint, p.Token, p.CertAuthFilePath) cl, err = newExternalClusterClient(p.Endpoint, p.Token, p.CertAuthFilePath)
} }
if err == nil { if err != nil {
cl.ingressLabelSelector = ingLabelSel return nil, err
} }
return cl, err cl.ingressLabelSelector = p.LabelSelector
return cl, nil
} }
// Init the provider. // Init the provider.
@ -99,8 +100,7 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
ctxLog := log.With(context.Background(), log.Str(log.ProviderName, "kubernetes")) ctxLog := log.With(context.Background(), log.Str(log.ProviderName, "kubernetes"))
logger := log.FromContext(ctxLog) logger := log.FromContext(ctxLog)
logger.Debugf("Using Ingress label selector: %q", p.LabelSelector) k8sClient, err := p.newK8sClient(ctxLog)
k8sClient, err := p.newK8sClient(ctxLog, p.LabelSelector)
if err != nil { if err != nil {
return err return err
} }