From 87eac1dc1a65dc3a5dc9f4686736c55a4770904b Mon Sep 17 00:00:00 2001 From: Phil Kates Date: Fri, 2 Dec 2016 11:35:19 -0800 Subject: [PATCH 1/2] Fix deadlock in k8s provider On a reasonably sized cluster: 63 nodes 87 services 90 endpoints The initialization of the k8s provider would hang. I tracked this down to the ResourceEventHandlerFuncs. Once you reach the channel buffer size (10) the k8s Informer gets stuck. You can't read or write messages to the channel anymore. I think this is probably a lock issue somewhere in k8s but the more reasonable solution for the traefik usecase is to just drop events when the queue is full since we only use the events for signalling, not their content, thus dropping an event doesn't matter. --- provider/k8s/client.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/provider/k8s/client.go b/provider/k8s/client.go index 14420b74f..0f44e5e31 100644 --- a/provider/k8s/client.go +++ b/provider/k8s/client.go @@ -110,14 +110,22 @@ func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, stopCh <-chan return watchCh } -func newResourceEventHandlerFuncs(events chan interface{}) cache.ResourceEventHandlerFuncs { - - return cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { events <- obj }, - UpdateFunc: func(old, new interface{}) { events <- new }, - DeleteFunc: func(obj interface{}) { events <- obj }, +// eventHandlerFunc will pass the obj on to the events channel or drop it +// This is so passing the events along won't block in the case of high volume +// The events are only used for signalling anyway so dropping a few is ok +func eventHandlerFunc(events chan interface{}, obj interface{}) { + select { + case events <- obj: + default: } +} +func newResourceEventHandlerFuncs(events chan interface{}) cache.ResourceEventHandlerFuncs { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { eventHandlerFunc(events, obj) }, + UpdateFunc: func(old, new interface{}) { eventHandlerFunc(events, new) }, + DeleteFunc: func(obj interface{}) { eventHandlerFunc(events, obj) }, + } } // GetService returns the named service from the named namespace From fc788eb426dbd81334db2842556ecf22146f34a0 Mon Sep 17 00:00:00 2001 From: Yves Peter Date: Sat, 3 Dec 2016 21:20:39 +0100 Subject: [PATCH 2/2] Kubernetes: cleanup channel handling Only use one channel for all watches Re-use stop channel from the provider Skip events that have already been handled by the provider, builds on 007f8cc48ea9504bb7754c5e3244124be422f47d --- provider/k8s/client.go | 51 +++++++++++++------------------------ provider/kubernetes.go | 3 +-- provider/kubernetes_test.go | 6 +---- 3 files changed, 20 insertions(+), 40 deletions(-) diff --git a/provider/k8s/client.go b/provider/k8s/client.go index 0f44e5e31..d4806d870 100644 --- a/provider/k8s/client.go +++ b/provider/k8s/client.go @@ -24,7 +24,7 @@ type Client interface { GetIngresses(namespaces Namespaces) []*v1beta1.Ingress GetService(namespace, name string) (*v1.Service, bool, error) GetEndpoints(namespace, name string) (*v1.Endpoints, bool, error) - WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, error) + WatchAll(labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error) } type clientImpl struct { @@ -90,9 +90,7 @@ func (c *clientImpl) GetIngresses(namespaces Namespaces) []*v1beta1.Ingress { } // WatchIngresses starts the watch of Kubernetes Ingresses resources and updates the corresponding store -func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, stopCh <-chan struct{}) chan interface{} { - watchCh := make(chan interface{}, 10) - +func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, watchCh chan<- interface{}, stopCh <-chan struct{}) { source := NewListWatchFromClient( c.clientset.ExtensionsClient, "ingresses", @@ -106,21 +104,19 @@ func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, stopCh <-chan resyncPeriod, newResourceEventHandlerFuncs(watchCh)) go c.ingController.Run(stopCh) - - return watchCh } // eventHandlerFunc will pass the obj on to the events channel or drop it // This is so passing the events along won't block in the case of high volume // The events are only used for signalling anyway so dropping a few is ok -func eventHandlerFunc(events chan interface{}, obj interface{}) { +func eventHandlerFunc(events chan<- interface{}, obj interface{}) { select { case events <- obj: default: } } -func newResourceEventHandlerFuncs(events chan interface{}) cache.ResourceEventHandlerFuncs { +func newResourceEventHandlerFuncs(events chan<- interface{}) cache.ResourceEventHandlerFuncs { return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { eventHandlerFunc(events, obj) }, UpdateFunc: func(old, new interface{}) { eventHandlerFunc(events, new) }, @@ -140,9 +136,7 @@ func (c *clientImpl) GetService(namespace, name string) (*v1.Service, bool, erro } // WatchServices starts the watch of Kubernetes Service resources and updates the corresponding store -func (c *clientImpl) WatchServices(stopCh <-chan struct{}) chan interface{} { - watchCh := make(chan interface{}, 10) - +func (c *clientImpl) WatchServices(watchCh chan<- interface{}, stopCh <-chan struct{}) { source := cache.NewListWatchFromClient( c.clientset.CoreClient, "services", @@ -155,8 +149,6 @@ func (c *clientImpl) WatchServices(stopCh <-chan struct{}) chan interface{} { resyncPeriod, newResourceEventHandlerFuncs(watchCh)) go c.svcController.Run(stopCh) - - return watchCh } // GetEndpoints returns the named Endpoints @@ -173,9 +165,7 @@ func (c *clientImpl) GetEndpoints(namespace, name string) (*v1.Endpoints, bool, } // WatchEndpoints starts the watch of Kubernetes Endpoints resources and updates the corresponding store -func (c *clientImpl) WatchEndpoints(stopCh <-chan struct{}) chan interface{} { - watchCh := make(chan interface{}, 10) - +func (c *clientImpl) WatchEndpoints(watchCh chan<- interface{}, stopCh <-chan struct{}) { source := cache.NewListWatchFromClient( c.clientset.CoreClient, "endpoints", @@ -188,38 +178,32 @@ func (c *clientImpl) WatchEndpoints(stopCh <-chan struct{}) chan interface{} { resyncPeriod, newResourceEventHandlerFuncs(watchCh)) go c.epController.Run(stopCh) - - return watchCh } // WatchAll returns events in the cluster and updates the stores via informer // Filters ingresses by labelSelector -func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, error) { - watchCh := make(chan interface{}, 100) - stopWatchCh := make(chan struct{}, 1) +func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error) { + watchCh := make(chan interface{}, 1) + eventCh := make(chan interface{}, 1) kubeLabelSelector, err := labels.Parse(labelSelector) if err != nil { return nil, err } - chanIngresses := c.WatchIngresses(kubeLabelSelector, stopWatchCh) - chanServices := c.WatchServices(stopWatchCh) - chanEndpoints := c.WatchEndpoints(stopWatchCh) + c.WatchIngresses(kubeLabelSelector, eventCh, stopCh) + c.WatchServices(eventCh, stopCh) + c.WatchEndpoints(eventCh, stopCh) go func() { - defer close(stopWatchCh) defer close(watchCh) + defer close(eventCh) for { select { case <-stopCh: return - case event := <-chanIngresses: - c.fireEvent(event, watchCh) - case event := <-chanServices: - c.fireEvent(event, watchCh) - case event := <-chanEndpoints: + case event := <-eventCh: c.fireEvent(event, watchCh) } } @@ -230,10 +214,11 @@ func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan bool) (chan in // fireEvent checks if all controllers have synced before firing // Used after startup or a reconnect -func (c *clientImpl) fireEvent(event interface{}, watchCh chan interface{}) { - if c.ingController.HasSynced() && c.svcController.HasSynced() && c.epController.HasSynced() { - watchCh <- event +func (c *clientImpl) fireEvent(event interface{}, eventCh chan interface{}) { + if !c.ingController.HasSynced() || !c.svcController.HasSynced() || !c.epController.HasSynced() { + return } + eventHandlerFunc(eventCh, event) } // HasNamespace checks if the ingress is in one of the namespaces diff --git a/provider/kubernetes.go b/provider/kubernetes.go index af01db13c..1752cf686 100644 --- a/provider/kubernetes.go +++ b/provider/kubernetes.go @@ -52,7 +52,7 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage pool.Go(func(stop chan bool) { operation := func() error { for { - stopWatch := make(chan bool, 5) + stopWatch := make(chan struct{}, 1) defer close(stopWatch) log.Debugf("Using label selector: '%s'", provider.LabelSelector) eventsChan, err := k8sClient.WatchAll(provider.LabelSelector, stopWatch) @@ -69,7 +69,6 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage for { select { case <-stop: - stopWatch <- true return nil case event := <-eventsChan: log.Debugf("Received event from kubernetes %+v", event) diff --git a/provider/kubernetes_test.go b/provider/kubernetes_test.go index 4148a72c2..7b87da941 100644 --- a/provider/kubernetes_test.go +++ b/provider/kubernetes_test.go @@ -1277,10 +1277,6 @@ func (c clientMock) GetIngresses(namespaces k8s.Namespaces) []*v1beta1.Ingress { return result } -func (c clientMock) WatchIngresses(labelSelector string, stopCh <-chan struct{}) chan interface{} { - return c.watchChan -} - func (c clientMock) GetService(namespace, name string) (*v1.Service, bool, error) { for _, service := range c.services { if service.Namespace == namespace && service.Name == name { @@ -1299,6 +1295,6 @@ func (c clientMock) GetEndpoints(namespace, name string) (*v1.Endpoints, bool, e return &v1.Endpoints{}, true, nil } -func (c clientMock) WatchAll(labelString string, stopCh <-chan bool) (chan interface{}, error) { +func (c clientMock) WatchAll(labelString string, stopCh <-chan struct{}) (<-chan interface{}, error) { return c.watchChan, nil }