Limit label selector to Ingress factory.
This commit is contained in:
parent
7109910f46
commit
702876ae7f
3 changed files with 50 additions and 32 deletions
|
@ -39,7 +39,7 @@ func (reh *resourceEventHandler) OnDelete(obj interface{}) {
|
||||||
// WatchAll starts the watch of the Provider resources and updates the stores.
|
// WatchAll starts the watch of the Provider resources and updates the stores.
|
||||||
// The stores can then be accessed via the Get* functions.
|
// The stores can then be accessed via the Get* functions.
|
||||||
type Client interface {
|
type Client interface {
|
||||||
WatchAll(namespaces Namespaces, labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error)
|
WatchAll(namespaces Namespaces, stopCh <-chan struct{}) (<-chan interface{}, error)
|
||||||
GetIngresses() []*extensionsv1beta1.Ingress
|
GetIngresses() []*extensionsv1beta1.Ingress
|
||||||
GetService(namespace, name string) (*corev1.Service, bool, error)
|
GetService(namespace, name string) (*corev1.Service, bool, error)
|
||||||
GetSecret(namespace, name string) (*corev1.Secret, bool, error)
|
GetSecret(namespace, name string) (*corev1.Secret, bool, error)
|
||||||
|
@ -47,21 +47,22 @@ type Client interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientImpl struct {
|
type clientImpl struct {
|
||||||
clientset *kubernetes.Clientset
|
clientset *kubernetes.Clientset
|
||||||
factories map[string]informers.SharedInformerFactory
|
factories map[string]informers.SharedInformerFactory
|
||||||
isNamespaceAll bool
|
ingressLabelSelector labels.Selector
|
||||||
|
isNamespaceAll bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newClientImpl(clientset *kubernetes.Clientset) Client {
|
func newClientImpl(clientset *kubernetes.Clientset) *clientImpl {
|
||||||
return &clientImpl{
|
return &clientImpl{
|
||||||
clientset: clientset,
|
clientset: clientset,
|
||||||
factories: make(map[string]informers.SharedInformerFactory),
|
factories: make(map[string]informers.SharedInformerFactory),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewInClusterClient returns a new Provider client that is expected to run
|
// newInClusterClient returns a new Provider client that is expected to run
|
||||||
// inside the cluster.
|
// inside the cluster.
|
||||||
func NewInClusterClient(endpoint string) (Client, error) {
|
func newInClusterClient(endpoint string) (*clientImpl, error) {
|
||||||
config, err := rest.InClusterConfig()
|
config, err := rest.InClusterConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create in-cluster configuration: %s", err)
|
return nil, fmt.Errorf("failed to create in-cluster configuration: %s", err)
|
||||||
|
@ -74,10 +75,10 @@ func NewInClusterClient(endpoint string) (Client, error) {
|
||||||
return createClientFromConfig(config)
|
return createClientFromConfig(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewExternalClusterClient returns a new Provider client that may run outside
|
// newExternalClusterClient returns a new Provider client that may run outside
|
||||||
// of the cluster.
|
// of the cluster.
|
||||||
// The endpoint parameter must not be empty.
|
// The endpoint parameter must not be empty.
|
||||||
func NewExternalClusterClient(endpoint, token, caFilePath string) (Client, error) {
|
func newExternalClusterClient(endpoint, token, caFilePath string) (*clientImpl, error) {
|
||||||
if endpoint == "" {
|
if endpoint == "" {
|
||||||
return nil, errors.New("endpoint missing for external cluster client")
|
return nil, errors.New("endpoint missing for external cluster client")
|
||||||
}
|
}
|
||||||
|
@ -99,7 +100,7 @@ func NewExternalClusterClient(endpoint, token, caFilePath string) (Client, error
|
||||||
return createClientFromConfig(config)
|
return createClientFromConfig(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
func createClientFromConfig(c *rest.Config) (Client, error) {
|
func createClientFromConfig(c *rest.Config) (*clientImpl, error) {
|
||||||
clientset, err := kubernetes.NewForConfig(c)
|
clientset, err := kubernetes.NewForConfig(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -109,24 +110,17 @@ func createClientFromConfig(c *rest.Config) (Client, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchAll starts namespace-specific controllers for all relevant kinds.
|
// WatchAll starts namespace-specific controllers for all relevant kinds.
|
||||||
func (c *clientImpl) WatchAll(namespaces Namespaces, labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error) {
|
func (c *clientImpl) WatchAll(namespaces Namespaces, stopCh <-chan struct{}) (<-chan interface{}, error) {
|
||||||
eventCh := make(chan interface{}, 1)
|
eventCh := make(chan interface{}, 1)
|
||||||
|
|
||||||
_, err := labels.Parse(labelSelector)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(namespaces) == 0 {
|
if len(namespaces) == 0 {
|
||||||
namespaces = Namespaces{metav1.NamespaceAll}
|
namespaces = Namespaces{metav1.NamespaceAll}
|
||||||
c.isNamespaceAll = true
|
c.isNamespaceAll = true
|
||||||
}
|
}
|
||||||
|
|
||||||
eventHandler := newResourceEventHandler(eventCh)
|
eventHandler := c.newResourceEventHandler(eventCh)
|
||||||
for _, ns := range namespaces {
|
for _, ns := range namespaces {
|
||||||
factory := informers.NewFilteredSharedInformerFactory(c.clientset, resyncPeriod, ns, func(opts *metav1.ListOptions) {
|
factory := informers.NewFilteredSharedInformerFactory(c.clientset, resyncPeriod, ns, nil)
|
||||||
opts.LabelSelector = labelSelector
|
|
||||||
})
|
|
||||||
factory.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler)
|
factory.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler)
|
||||||
factory.Core().V1().Services().Informer().AddEventHandler(eventHandler)
|
factory.Core().V1().Services().Informer().AddEventHandler(eventHandler)
|
||||||
factory.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler)
|
factory.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler)
|
||||||
|
@ -161,7 +155,7 @@ func (c *clientImpl) WatchAll(namespaces Namespaces, labelSelector string, stopC
|
||||||
func (c *clientImpl) GetIngresses() []*extensionsv1beta1.Ingress {
|
func (c *clientImpl) GetIngresses() []*extensionsv1beta1.Ingress {
|
||||||
var result []*extensionsv1beta1.Ingress
|
var result []*extensionsv1beta1.Ingress
|
||||||
for ns, factory := range c.factories {
|
for ns, factory := range c.factories {
|
||||||
ings, err := factory.Extensions().V1beta1().Ingresses().Lister().List(labels.Everything())
|
ings, err := factory.Extensions().V1beta1().Ingresses().Lister().List(c.ingressLabelSelector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Failed to list ingresses in namespace %s: %s", ns, err)
|
log.Errorf("Failed to list ingresses in namespace %s: %s", ns, err)
|
||||||
}
|
}
|
||||||
|
@ -215,8 +209,18 @@ func (c *clientImpl) lookupNamespace(ns string) string {
|
||||||
return ns
|
return ns
|
||||||
}
|
}
|
||||||
|
|
||||||
func newResourceEventHandler(events chan<- interface{}) cache.ResourceEventHandler {
|
func (c *clientImpl) newResourceEventHandler(events chan<- interface{}) cache.ResourceEventHandler {
|
||||||
return &resourceEventHandler{events}
|
return &cache.FilteringResourceEventHandler{
|
||||||
|
FilterFunc: func(obj interface{}) bool {
|
||||||
|
// Ignore Ingresses that do not match our custom label selector.
|
||||||
|
if ing, ok := obj.(*extensionsv1beta1.Ingress); ok {
|
||||||
|
lbls := labels.Set(ing.GetLabels())
|
||||||
|
return c.ingressLabelSelector.Matches(lbls)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
},
|
||||||
|
Handler: &resourceEventHandler{events},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// eventHandlerFunc will pass the obj on to the events channel or drop it.
|
// eventHandlerFunc will pass the obj on to the events channel or drop it.
|
||||||
|
|
|
@ -61,6 +61,6 @@ func (c clientMock) GetSecret(namespace, name string) (*corev1.Secret, bool, err
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c clientMock) WatchAll(namespaces Namespaces, labelString string, stopCh <-chan struct{}) (<-chan interface{}, error) {
|
func (c clientMock) WatchAll(namespaces Namespaces, stopCh <-chan struct{}) (<-chan interface{}, error) {
|
||||||
return c.watchChan, nil
|
return c.watchChan, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"gopkg.in/yaml.v2"
|
"gopkg.in/yaml.v2"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
|
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -45,24 +46,37 @@ type Provider struct {
|
||||||
DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers" export:"true"`
|
DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers" export:"true"`
|
||||||
EnablePassTLSCert bool `description:"Kubernetes enable Pass TLS Client Certs" export:"true"`
|
EnablePassTLSCert bool `description:"Kubernetes enable Pass TLS Client Certs" export:"true"`
|
||||||
Namespaces Namespaces `description:"Kubernetes namespaces" export:"true"`
|
Namespaces Namespaces `description:"Kubernetes namespaces" export:"true"`
|
||||||
LabelSelector string `description:"Kubernetes api label selector to use" export:"true"`
|
LabelSelector string `description:"Kubernetes Ingress label selector to use" export:"true"`
|
||||||
IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for" export:"true"`
|
IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for" export:"true"`
|
||||||
lastConfiguration safe.Safe
|
lastConfiguration safe.Safe
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Provider) newK8sClient() (Client, error) {
|
func (p *Provider) newK8sClient(ingressLabelSelector string) (Client, error) {
|
||||||
|
ingLabelSel, err := labels.Parse(ingressLabelSelector)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid ingress label selector: %q", ingressLabelSelector)
|
||||||
|
}
|
||||||
|
log.Infof("ingress label selector is: %q", ingLabelSel)
|
||||||
|
|
||||||
withEndpoint := ""
|
withEndpoint := ""
|
||||||
if p.Endpoint != "" {
|
if p.Endpoint != "" {
|
||||||
withEndpoint = fmt.Sprintf(" with endpoint %v", p.Endpoint)
|
withEndpoint = fmt.Sprintf(" with endpoint %v", p.Endpoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var cl *clientImpl
|
||||||
if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" {
|
if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" {
|
||||||
log.Infof("Creating in-cluster Provider client%s", withEndpoint)
|
log.Infof("Creating in-cluster Provider client%s", withEndpoint)
|
||||||
return NewInClusterClient(p.Endpoint)
|
cl, err = newInClusterClient(p.Endpoint)
|
||||||
|
} else {
|
||||||
|
log.Infof("Creating cluster-external Provider client%s", withEndpoint)
|
||||||
|
cl, err = newExternalClusterClient(p.Endpoint, p.Token, p.CertAuthFilePath)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("Creating cluster-external Provider client%s", withEndpoint)
|
if err == nil {
|
||||||
return NewExternalClusterClient(p.Endpoint, p.Token, p.CertAuthFilePath)
|
cl.ingressLabelSelector = ingLabelSel
|
||||||
|
}
|
||||||
|
|
||||||
|
return cl, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Provide allows the k8s provider to provide configurations to traefik
|
// Provide allows the k8s provider to provide configurations to traefik
|
||||||
|
@ -83,7 +97,8 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
|
||||||
return fmt.Errorf("value for IngressClass has to be empty or start with the prefix %q, instead found %q", traefikDefaultIngressClass, p.IngressClass)
|
return fmt.Errorf("value for IngressClass has to be empty or start with the prefix %q, instead found %q", traefikDefaultIngressClass, p.IngressClass)
|
||||||
}
|
}
|
||||||
|
|
||||||
k8sClient, err := p.newK8sClient()
|
log.Debugf("Using Ingress label selector: %q", p.LabelSelector)
|
||||||
|
k8sClient, err := p.newK8sClient(p.LabelSelector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -94,8 +109,7 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
|
||||||
for {
|
for {
|
||||||
stopWatch := make(chan struct{}, 1)
|
stopWatch := make(chan struct{}, 1)
|
||||||
defer close(stopWatch)
|
defer close(stopWatch)
|
||||||
log.Debugf("Using label selector: '%s'", p.LabelSelector)
|
eventsChan, err := k8sClient.WatchAll(p.Namespaces, stopWatch)
|
||||||
eventsChan, err := k8sClient.WatchAll(p.Namespaces, p.LabelSelector, stopWatch)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error watching kubernetes events: %v", err)
|
log.Errorf("Error watching kubernetes events: %v", err)
|
||||||
timer := time.NewTimer(1 * time.Second)
|
timer := time.NewTimer(1 * time.Second)
|
||||||
|
|
Loading…
Add table
Reference in a new issue