diff --git a/pkg/provider/kubernetes/crd/client.go b/pkg/provider/kubernetes/crd/client.go index 78e22e4cc..26f045d54 100644 --- a/pkg/provider/kubernetes/crd/client.go +++ b/pkg/provider/kubernetes/crd/client.go @@ -61,11 +61,12 @@ type Client interface { // TODO: add tests for the clientWrapper (and its methods) itself. type clientWrapper struct { - csCrd *versioned.Clientset - csKube *kubernetes.Clientset + csCrd versioned.Interface + csKube kubernetes.Interface - factoriesCrd map[string]externalversions.SharedInformerFactory - factoriesKube map[string]informers.SharedInformerFactory + factoriesCrd map[string]externalversions.SharedInformerFactory + factoriesKube map[string]informers.SharedInformerFactory + factoriesSecret map[string]informers.SharedInformerFactory labelSelector labels.Selector @@ -87,12 +88,13 @@ func createClientFromConfig(c *rest.Config) (*clientWrapper, error) { return newClientImpl(csKube, csCrd), nil } -func newClientImpl(csKube *kubernetes.Clientset, csCrd *versioned.Clientset) *clientWrapper { +func newClientImpl(csKube kubernetes.Interface, csCrd versioned.Interface) *clientWrapper { return &clientWrapper{ - csCrd: csCrd, - csKube: csKube, - factoriesCrd: make(map[string]externalversions.SharedInformerFactory), - factoriesKube: make(map[string]informers.SharedInformerFactory), + csCrd: csCrd, + csKube: csKube, + factoriesCrd: make(map[string]externalversions.SharedInformerFactory), + factoriesKube: make(map[string]informers.SharedInformerFactory), + factoriesSecret: make(map[string]informers.SharedInformerFactory), } } @@ -155,6 +157,10 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (< } c.watchedNamespaces = namespaces + notOwnedByHelm := func(opts *metav1.ListOptions) { + opts.LabelSelector = "owner!=helm" + } + for _, ns := range namespaces { factoryCrd := externalversions.NewSharedInformerFactoryWithOptions(c.csCrd, resyncPeriod, externalversions.WithNamespace(ns)) factoryCrd.Traefik().V1alpha1().IngressRoutes().Informer().AddEventHandler(eventHandler) @@ -169,15 +175,19 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (< factoryKube.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler) factoryKube.Core().V1().Services().Informer().AddEventHandler(eventHandler) factoryKube.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler) - factoryKube.Core().V1().Secrets().Informer().AddEventHandler(eventHandler) + + factorySecret := informers.NewSharedInformerFactoryWithOptions(c.csKube, resyncPeriod, informers.WithNamespace(ns), informers.WithTweakListOptions(notOwnedByHelm)) + factorySecret.Core().V1().Secrets().Informer().AddEventHandler(eventHandler) c.factoriesCrd[ns] = factoryCrd c.factoriesKube[ns] = factoryKube + c.factoriesSecret[ns] = factorySecret } for _, ns := range namespaces { c.factoriesCrd[ns].Start(stopCh) c.factoriesKube[ns].Start(stopCh) + c.factoriesSecret[ns].Start(stopCh) } for _, ns := range namespaces { @@ -192,6 +202,12 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (< return nil, fmt.Errorf("timed out waiting for controller caches to sync %s in namespace %q", t.String(), ns) } } + + for t, ok := range c.factoriesSecret[ns].WaitForCacheSync(stopCh) { + if !ok { + return nil, fmt.Errorf("timed out waiting for controller caches to sync %s in namespace %q", t.String(), ns) + } + } } return eventCh, nil @@ -337,7 +353,7 @@ func (c *clientWrapper) GetSecret(namespace, name string) (*corev1.Secret, bool, return nil, false, fmt.Errorf("failed to get secret %s/%s: namespace is not within watched namespaces", namespace, name) } - secret, err := c.factoriesKube[c.lookupNamespace(namespace)].Core().V1().Secrets().Lister().Secrets(namespace).Get(name) + secret, err := c.factoriesSecret[c.lookupNamespace(namespace)].Core().V1().Secrets().Lister().Secrets(namespace).Get(name) exist, err := translateNotFoundError(err) return secret, exist, err } diff --git a/pkg/provider/kubernetes/crd/client_test.go b/pkg/provider/kubernetes/crd/client_test.go new file mode 100644 index 000000000..a4113b56f --- /dev/null +++ b/pkg/provider/kubernetes/crd/client_test.go @@ -0,0 +1,63 @@ +package crd + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + crdfake "github.com/traefik/traefik/v2/pkg/provider/kubernetes/crd/generated/clientset/versioned/fake" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubefake "k8s.io/client-go/kubernetes/fake" +) + +func TestClientIgnoresHelmOwnedSecrets(t *testing.T) { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "secret", + }, + } + helmSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "helm-secret", + Labels: map[string]string{ + "owner": "helm", + }, + }, + } + + kubeClient := kubefake.NewSimpleClientset(helmSecret, secret) + crdClient := crdfake.NewSimpleClientset() + + client := newClientImpl(kubeClient, crdClient) + + eventCh, err := client.WatchAll(nil, nil) + require.NoError(t, err) + + select { + case event := <-eventCh: + secret, ok := event.(*corev1.Secret) + require.True(t, ok) + + assert.NotEqual(t, "helm-secret", secret.Name) + case <-time.After(50 * time.Millisecond): + assert.Fail(t, "expected to receive event for secret") + } + + select { + case <-eventCh: + assert.Fail(t, "received more than one event") + case <-time.After(50 * time.Millisecond): + } + + _, found, err := client.GetSecret("default", "secret") + require.NoError(t, err) + assert.True(t, found) + + _, found, err = client.GetSecret("default", "helm-secret") + require.NoError(t, err) + assert.False(t, found) +} diff --git a/pkg/provider/kubernetes/ingress/client.go b/pkg/provider/kubernetes/ingress/client.go index 0012465e5..e54887e75 100644 --- a/pkg/provider/kubernetes/ingress/client.go +++ b/pkg/provider/kubernetes/ingress/client.go @@ -59,8 +59,9 @@ type Client interface { } type clientWrapper struct { - clientset *kubernetes.Clientset - factories map[string]informers.SharedInformerFactory + clientset kubernetes.Interface + factoriesKube map[string]informers.SharedInformerFactory + factoriesSecret map[string]informers.SharedInformerFactory clusterFactory informers.SharedInformerFactory ingressLabelSelector labels.Selector isNamespaceAll bool @@ -123,10 +124,11 @@ func createClientFromConfig(c *rest.Config) (*clientWrapper, error) { return newClientImpl(clientset), nil } -func newClientImpl(clientset *kubernetes.Clientset) *clientWrapper { +func newClientImpl(clientset kubernetes.Interface) *clientWrapper { return &clientWrapper{ - clientset: clientset, - factories: make(map[string]informers.SharedInformerFactory), + clientset: clientset, + factoriesKube: make(map[string]informers.SharedInformerFactory), + factoriesSecret: make(map[string]informers.SharedInformerFactory), } } @@ -142,21 +144,36 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (< c.watchedNamespaces = namespaces - for _, ns := range namespaces { - factory := informers.NewSharedInformerFactoryWithOptions(c.clientset, resyncPeriod, informers.WithNamespace(ns)) - factory.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler) - factory.Core().V1().Services().Informer().AddEventHandler(eventHandler) - factory.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler) - factory.Core().V1().Secrets().Informer().AddEventHandler(eventHandler) - c.factories[ns] = factory + notOwnedByHelm := func(opts *metav1.ListOptions) { + opts.LabelSelector = "owner!=helm" } for _, ns := range namespaces { - c.factories[ns].Start(stopCh) + factoryKube := informers.NewSharedInformerFactoryWithOptions(c.clientset, resyncPeriod, informers.WithNamespace(ns)) + factoryKube.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler) + factoryKube.Core().V1().Services().Informer().AddEventHandler(eventHandler) + factoryKube.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler) + + factorySecret := informers.NewSharedInformerFactoryWithOptions(c.clientset, resyncPeriod, informers.WithNamespace(ns), informers.WithTweakListOptions(notOwnedByHelm)) + factorySecret.Core().V1().Secrets().Informer().AddEventHandler(eventHandler) + + c.factoriesKube[ns] = factoryKube + c.factoriesSecret[ns] = factorySecret } for _, ns := range namespaces { - for typ, ok := range c.factories[ns].WaitForCacheSync(stopCh) { + c.factoriesKube[ns].Start(stopCh) + c.factoriesSecret[ns].Start(stopCh) + } + + for _, ns := range namespaces { + for typ, ok := range c.factoriesKube[ns].WaitForCacheSync(stopCh) { + if !ok { + return nil, fmt.Errorf("timed out waiting for controller caches to sync %s in namespace %q", typ, ns) + } + } + + for typ, ok := range c.factoriesSecret[ns].WaitForCacheSync(stopCh) { if !ok { return nil, fmt.Errorf("timed out waiting for controller caches to sync %s in namespace %q", typ, ns) } @@ -188,7 +205,7 @@ func (c *clientWrapper) WatchAll(namespaces []string, stopCh <-chan struct{}) (< func (c *clientWrapper) GetIngresses() []*networkingv1beta1.Ingress { var results []*networkingv1beta1.Ingress - for ns, factory := range c.factories { + for ns, factory := range c.factoriesKube { // extensions ings, err := factory.Extensions().V1beta1().Ingresses().Lister().List(c.ingressLabelSelector) if err != nil { @@ -239,7 +256,7 @@ func (c *clientWrapper) UpdateIngressStatus(src *networkingv1beta1.Ingress, ip, return c.updateIngressStatusOld(src, ip, hostname) } - ing, err := c.factories[c.lookupNamespace(src.Namespace)].Networking().V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name) + ing, err := c.factoriesKube[c.lookupNamespace(src.Namespace)].Networking().V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name) if err != nil { return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespace, src.Name, err) } @@ -268,7 +285,7 @@ func (c *clientWrapper) UpdateIngressStatus(src *networkingv1beta1.Ingress, ip, } func (c *clientWrapper) updateIngressStatusOld(src *networkingv1beta1.Ingress, ip, hostname string) error { - ing, err := c.factories[c.lookupNamespace(src.Namespace)].Extensions().V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name) + ing, err := c.factoriesKube[c.lookupNamespace(src.Namespace)].Extensions().V1beta1().Ingresses().Lister().Ingresses(src.Namespace).Get(src.Name) if err != nil { return fmt.Errorf("failed to get ingress %s/%s: %w", src.Namespace, src.Name, err) } @@ -302,7 +319,7 @@ func (c *clientWrapper) GetService(namespace, name string) (*corev1.Service, boo return nil, false, fmt.Errorf("failed to get service %s/%s: namespace is not within watched namespaces", namespace, name) } - service, err := c.factories[c.lookupNamespace(namespace)].Core().V1().Services().Lister().Services(namespace).Get(name) + service, err := c.factoriesKube[c.lookupNamespace(namespace)].Core().V1().Services().Lister().Services(namespace).Get(name) exist, err := translateNotFoundError(err) return service, exist, err } @@ -313,7 +330,7 @@ func (c *clientWrapper) GetEndpoints(namespace, name string) (*corev1.Endpoints, return nil, false, fmt.Errorf("failed to get endpoints %s/%s: namespace is not within watched namespaces", namespace, name) } - endpoint, err := c.factories[c.lookupNamespace(namespace)].Core().V1().Endpoints().Lister().Endpoints(namespace).Get(name) + endpoint, err := c.factoriesKube[c.lookupNamespace(namespace)].Core().V1().Endpoints().Lister().Endpoints(namespace).Get(name) exist, err := translateNotFoundError(err) return endpoint, exist, err } @@ -324,7 +341,7 @@ func (c *clientWrapper) GetSecret(namespace, name string) (*corev1.Secret, bool, return nil, false, fmt.Errorf("failed to get secret %s/%s: namespace is not within watched namespaces", namespace, name) } - secret, err := c.factories[c.lookupNamespace(namespace)].Core().V1().Secrets().Lister().Secrets(namespace).Get(name) + secret, err := c.factoriesSecret[c.lookupNamespace(namespace)].Core().V1().Secrets().Lister().Secrets(namespace).Get(name) exist, err := translateNotFoundError(err) return secret, exist, err } diff --git a/pkg/provider/kubernetes/ingress/client_test.go b/pkg/provider/kubernetes/ingress/client_test.go index f582502a7..20173200d 100644 --- a/pkg/provider/kubernetes/ingress/client_test.go +++ b/pkg/provider/kubernetes/ingress/client_test.go @@ -3,10 +3,15 @@ package ingress import ( "fmt" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" kubeerror "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + kubefake "k8s.io/client-go/kubernetes/fake" ) func TestTranslateNotFoundError(t *testing.T) { @@ -47,3 +52,54 @@ func TestTranslateNotFoundError(t *testing.T) { }) } } + +func TestClientIgnoresHelmOwnedSecrets(t *testing.T) { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "secret", + }, + } + helmSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "helm-secret", + Labels: map[string]string{ + "owner": "helm", + }, + }, + } + + kubeClient := kubefake.NewSimpleClientset(helmSecret, secret) + + client := newClientImpl(kubeClient) + + stopCh := make(chan struct{}) + + eventCh, err := client.WatchAll(nil, stopCh) + require.NoError(t, err) + + select { + case event := <-eventCh: + secret, ok := event.(*corev1.Secret) + require.True(t, ok) + + assert.NotEqual(t, "helm-secret", secret.Name) + case <-time.After(50 * time.Millisecond): + assert.Fail(t, "expected to receive event for secret") + } + + select { + case <-eventCh: + assert.Fail(t, "received more than one event") + case <-time.After(50 * time.Millisecond): + } + + _, found, err := client.GetSecret("default", "secret") + require.NoError(t, err) + assert.True(t, found) + + _, found, err = client.GetSecret("default", "helm-secret") + require.NoError(t, err) + assert.False(t, found) +}