diff --git a/provider/k8s/client.go b/provider/k8s/client.go index 14420b74f..d4806d870 100644 --- a/provider/k8s/client.go +++ b/provider/k8s/client.go @@ -24,7 +24,7 @@ type Client interface { GetIngresses(namespaces Namespaces) []*v1beta1.Ingress GetService(namespace, name string) (*v1.Service, bool, error) GetEndpoints(namespace, name string) (*v1.Endpoints, bool, error) - WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, error) + WatchAll(labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error) } type clientImpl struct { @@ -90,9 +90,7 @@ func (c *clientImpl) GetIngresses(namespaces Namespaces) []*v1beta1.Ingress { } // WatchIngresses starts the watch of Kubernetes Ingresses resources and updates the corresponding store -func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, stopCh <-chan struct{}) chan interface{} { - watchCh := make(chan interface{}, 10) - +func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, watchCh chan<- interface{}, stopCh <-chan struct{}) { source := NewListWatchFromClient( c.clientset.ExtensionsClient, "ingresses", @@ -106,18 +104,24 @@ func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, stopCh <-chan resyncPeriod, newResourceEventHandlerFuncs(watchCh)) go c.ingController.Run(stopCh) - - return watchCh } -func newResourceEventHandlerFuncs(events chan interface{}) cache.ResourceEventHandlerFuncs { - - return cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { events <- obj }, - UpdateFunc: func(old, new interface{}) { events <- new }, - DeleteFunc: func(obj interface{}) { events <- obj }, +// eventHandlerFunc will pass the obj on to the events channel or drop it +// This is so passing the events along won't block in the case of high volume +// The events are only used for signalling anyway so dropping a few is ok +func eventHandlerFunc(events chan<- interface{}, obj interface{}) { + select { + case events <- obj: + default: } +} +func newResourceEventHandlerFuncs(events chan<- interface{}) cache.ResourceEventHandlerFuncs { + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { eventHandlerFunc(events, obj) }, + UpdateFunc: func(old, new interface{}) { eventHandlerFunc(events, new) }, + DeleteFunc: func(obj interface{}) { eventHandlerFunc(events, obj) }, + } } // GetService returns the named service from the named namespace @@ -132,9 +136,7 @@ func (c *clientImpl) GetService(namespace, name string) (*v1.Service, bool, erro } // WatchServices starts the watch of Kubernetes Service resources and updates the corresponding store -func (c *clientImpl) WatchServices(stopCh <-chan struct{}) chan interface{} { - watchCh := make(chan interface{}, 10) - +func (c *clientImpl) WatchServices(watchCh chan<- interface{}, stopCh <-chan struct{}) { source := cache.NewListWatchFromClient( c.clientset.CoreClient, "services", @@ -147,8 +149,6 @@ func (c *clientImpl) WatchServices(stopCh <-chan struct{}) chan interface{} { resyncPeriod, newResourceEventHandlerFuncs(watchCh)) go c.svcController.Run(stopCh) - - return watchCh } // GetEndpoints returns the named Endpoints @@ -165,9 +165,7 @@ func (c *clientImpl) GetEndpoints(namespace, name string) (*v1.Endpoints, bool, } // WatchEndpoints starts the watch of Kubernetes Endpoints resources and updates the corresponding store -func (c *clientImpl) WatchEndpoints(stopCh <-chan struct{}) chan interface{} { - watchCh := make(chan interface{}, 10) - +func (c *clientImpl) WatchEndpoints(watchCh chan<- interface{}, stopCh <-chan struct{}) { source := cache.NewListWatchFromClient( c.clientset.CoreClient, "endpoints", @@ -180,38 +178,32 @@ func (c *clientImpl) WatchEndpoints(stopCh <-chan struct{}) chan interface{} { resyncPeriod, newResourceEventHandlerFuncs(watchCh)) go c.epController.Run(stopCh) - - return watchCh } // WatchAll returns events in the cluster and updates the stores via informer // Filters ingresses by labelSelector -func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, error) { - watchCh := make(chan interface{}, 100) - stopWatchCh := make(chan struct{}, 1) +func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error) { + watchCh := make(chan interface{}, 1) + eventCh := make(chan interface{}, 1) kubeLabelSelector, err := labels.Parse(labelSelector) if err != nil { return nil, err } - chanIngresses := c.WatchIngresses(kubeLabelSelector, stopWatchCh) - chanServices := c.WatchServices(stopWatchCh) - chanEndpoints := c.WatchEndpoints(stopWatchCh) + c.WatchIngresses(kubeLabelSelector, eventCh, stopCh) + c.WatchServices(eventCh, stopCh) + c.WatchEndpoints(eventCh, stopCh) go func() { - defer close(stopWatchCh) defer close(watchCh) + defer close(eventCh) for { select { case <-stopCh: return - case event := <-chanIngresses: - c.fireEvent(event, watchCh) - case event := <-chanServices: - c.fireEvent(event, watchCh) - case event := <-chanEndpoints: + case event := <-eventCh: c.fireEvent(event, watchCh) } } @@ -222,10 +214,11 @@ func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan bool) (chan in // fireEvent checks if all controllers have synced before firing // Used after startup or a reconnect -func (c *clientImpl) fireEvent(event interface{}, watchCh chan interface{}) { - if c.ingController.HasSynced() && c.svcController.HasSynced() && c.epController.HasSynced() { - watchCh <- event +func (c *clientImpl) fireEvent(event interface{}, eventCh chan interface{}) { + if !c.ingController.HasSynced() || !c.svcController.HasSynced() || !c.epController.HasSynced() { + return } + eventHandlerFunc(eventCh, event) } // HasNamespace checks if the ingress is in one of the namespaces diff --git a/provider/kubernetes.go b/provider/kubernetes.go index af01db13c..1752cf686 100644 --- a/provider/kubernetes.go +++ b/provider/kubernetes.go @@ -52,7 +52,7 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage pool.Go(func(stop chan bool) { operation := func() error { for { - stopWatch := make(chan bool, 5) + stopWatch := make(chan struct{}, 1) defer close(stopWatch) log.Debugf("Using label selector: '%s'", provider.LabelSelector) eventsChan, err := k8sClient.WatchAll(provider.LabelSelector, stopWatch) @@ -69,7 +69,6 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage for { select { case <-stop: - stopWatch <- true return nil case event := <-eventsChan: log.Debugf("Received event from kubernetes %+v", event) diff --git a/provider/kubernetes_test.go b/provider/kubernetes_test.go index 4148a72c2..7b87da941 100644 --- a/provider/kubernetes_test.go +++ b/provider/kubernetes_test.go @@ -1277,10 +1277,6 @@ func (c clientMock) GetIngresses(namespaces k8s.Namespaces) []*v1beta1.Ingress { return result } -func (c clientMock) WatchIngresses(labelSelector string, stopCh <-chan struct{}) chan interface{} { - return c.watchChan -} - func (c clientMock) GetService(namespace, name string) (*v1.Service, bool, error) { for _, service := range c.services { if service.Namespace == namespace && service.Name == name { @@ -1299,6 +1295,6 @@ func (c clientMock) GetEndpoints(namespace, name string) (*v1.Endpoints, bool, e return &v1.Endpoints{}, true, nil } -func (c clientMock) WatchAll(labelString string, stopCh <-chan bool) (chan interface{}, error) { +func (c clientMock) WatchAll(labelString string, stopCh <-chan struct{}) (<-chan interface{}, error) { return c.watchChan, nil }