Kubernetes: cleanup channel handling

Only use one channel for all watches
Re-use stop channel from the provider
Skip events that have already been handled by the provider, builds on 007f8cc48ea9504bb7754c5e3244124be422f47d
This commit is contained in:
Yves Peter 2016-12-03 21:20:39 +01:00
parent 87eac1dc1a
commit fc788eb426
3 changed files with 20 additions and 40 deletions

View file

@ -24,7 +24,7 @@ type Client interface {
GetIngresses(namespaces Namespaces) []*v1beta1.Ingress GetIngresses(namespaces Namespaces) []*v1beta1.Ingress
GetService(namespace, name string) (*v1.Service, bool, error) GetService(namespace, name string) (*v1.Service, bool, error)
GetEndpoints(namespace, name string) (*v1.Endpoints, bool, error) GetEndpoints(namespace, name string) (*v1.Endpoints, bool, error)
WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, error) WatchAll(labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error)
} }
type clientImpl struct { type clientImpl struct {
@ -90,9 +90,7 @@ func (c *clientImpl) GetIngresses(namespaces Namespaces) []*v1beta1.Ingress {
} }
// WatchIngresses starts the watch of Kubernetes Ingresses resources and updates the corresponding store // WatchIngresses starts the watch of Kubernetes Ingresses resources and updates the corresponding store
func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, stopCh <-chan struct{}) chan interface{} { func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, watchCh chan<- interface{}, stopCh <-chan struct{}) {
watchCh := make(chan interface{}, 10)
source := NewListWatchFromClient( source := NewListWatchFromClient(
c.clientset.ExtensionsClient, c.clientset.ExtensionsClient,
"ingresses", "ingresses",
@ -106,21 +104,19 @@ func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, stopCh <-chan
resyncPeriod, resyncPeriod,
newResourceEventHandlerFuncs(watchCh)) newResourceEventHandlerFuncs(watchCh))
go c.ingController.Run(stopCh) go c.ingController.Run(stopCh)
return watchCh
} }
// eventHandlerFunc will pass the obj on to the events channel or drop it // eventHandlerFunc will pass the obj on to the events channel or drop it
// This is so passing the events along won't block in the case of high volume // This is so passing the events along won't block in the case of high volume
// The events are only used for signalling anyway so dropping a few is ok // The events are only used for signalling anyway so dropping a few is ok
func eventHandlerFunc(events chan interface{}, obj interface{}) { func eventHandlerFunc(events chan<- interface{}, obj interface{}) {
select { select {
case events <- obj: case events <- obj:
default: default:
} }
} }
func newResourceEventHandlerFuncs(events chan interface{}) cache.ResourceEventHandlerFuncs { func newResourceEventHandlerFuncs(events chan<- interface{}) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{ return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { eventHandlerFunc(events, obj) }, AddFunc: func(obj interface{}) { eventHandlerFunc(events, obj) },
UpdateFunc: func(old, new interface{}) { eventHandlerFunc(events, new) }, UpdateFunc: func(old, new interface{}) { eventHandlerFunc(events, new) },
@ -140,9 +136,7 @@ func (c *clientImpl) GetService(namespace, name string) (*v1.Service, bool, erro
} }
// WatchServices starts the watch of Kubernetes Service resources and updates the corresponding store // WatchServices starts the watch of Kubernetes Service resources and updates the corresponding store
func (c *clientImpl) WatchServices(stopCh <-chan struct{}) chan interface{} { func (c *clientImpl) WatchServices(watchCh chan<- interface{}, stopCh <-chan struct{}) {
watchCh := make(chan interface{}, 10)
source := cache.NewListWatchFromClient( source := cache.NewListWatchFromClient(
c.clientset.CoreClient, c.clientset.CoreClient,
"services", "services",
@ -155,8 +149,6 @@ func (c *clientImpl) WatchServices(stopCh <-chan struct{}) chan interface{} {
resyncPeriod, resyncPeriod,
newResourceEventHandlerFuncs(watchCh)) newResourceEventHandlerFuncs(watchCh))
go c.svcController.Run(stopCh) go c.svcController.Run(stopCh)
return watchCh
} }
// GetEndpoints returns the named Endpoints // GetEndpoints returns the named Endpoints
@ -173,9 +165,7 @@ func (c *clientImpl) GetEndpoints(namespace, name string) (*v1.Endpoints, bool,
} }
// WatchEndpoints starts the watch of Kubernetes Endpoints resources and updates the corresponding store // WatchEndpoints starts the watch of Kubernetes Endpoints resources and updates the corresponding store
func (c *clientImpl) WatchEndpoints(stopCh <-chan struct{}) chan interface{} { func (c *clientImpl) WatchEndpoints(watchCh chan<- interface{}, stopCh <-chan struct{}) {
watchCh := make(chan interface{}, 10)
source := cache.NewListWatchFromClient( source := cache.NewListWatchFromClient(
c.clientset.CoreClient, c.clientset.CoreClient,
"endpoints", "endpoints",
@ -188,38 +178,32 @@ func (c *clientImpl) WatchEndpoints(stopCh <-chan struct{}) chan interface{} {
resyncPeriod, resyncPeriod,
newResourceEventHandlerFuncs(watchCh)) newResourceEventHandlerFuncs(watchCh))
go c.epController.Run(stopCh) go c.epController.Run(stopCh)
return watchCh
} }
// WatchAll returns events in the cluster and updates the stores via informer // WatchAll returns events in the cluster and updates the stores via informer
// Filters ingresses by labelSelector // Filters ingresses by labelSelector
func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, error) { func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error) {
watchCh := make(chan interface{}, 100) watchCh := make(chan interface{}, 1)
stopWatchCh := make(chan struct{}, 1) eventCh := make(chan interface{}, 1)
kubeLabelSelector, err := labels.Parse(labelSelector) kubeLabelSelector, err := labels.Parse(labelSelector)
if err != nil { if err != nil {
return nil, err return nil, err
} }
chanIngresses := c.WatchIngresses(kubeLabelSelector, stopWatchCh) c.WatchIngresses(kubeLabelSelector, eventCh, stopCh)
chanServices := c.WatchServices(stopWatchCh) c.WatchServices(eventCh, stopCh)
chanEndpoints := c.WatchEndpoints(stopWatchCh) c.WatchEndpoints(eventCh, stopCh)
go func() { go func() {
defer close(stopWatchCh)
defer close(watchCh) defer close(watchCh)
defer close(eventCh)
for { for {
select { select {
case <-stopCh: case <-stopCh:
return return
case event := <-chanIngresses: case event := <-eventCh:
c.fireEvent(event, watchCh)
case event := <-chanServices:
c.fireEvent(event, watchCh)
case event := <-chanEndpoints:
c.fireEvent(event, watchCh) c.fireEvent(event, watchCh)
} }
} }
@ -230,10 +214,11 @@ func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan bool) (chan in
// fireEvent checks if all controllers have synced before firing // fireEvent checks if all controllers have synced before firing
// Used after startup or a reconnect // Used after startup or a reconnect
func (c *clientImpl) fireEvent(event interface{}, watchCh chan interface{}) { func (c *clientImpl) fireEvent(event interface{}, eventCh chan interface{}) {
if c.ingController.HasSynced() && c.svcController.HasSynced() && c.epController.HasSynced() { if !c.ingController.HasSynced() || !c.svcController.HasSynced() || !c.epController.HasSynced() {
watchCh <- event return
} }
eventHandlerFunc(eventCh, event)
} }
// HasNamespace checks if the ingress is in one of the namespaces // HasNamespace checks if the ingress is in one of the namespaces

View file

@ -52,7 +52,7 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
pool.Go(func(stop chan bool) { pool.Go(func(stop chan bool) {
operation := func() error { operation := func() error {
for { for {
stopWatch := make(chan bool, 5) stopWatch := make(chan struct{}, 1)
defer close(stopWatch) defer close(stopWatch)
log.Debugf("Using label selector: '%s'", provider.LabelSelector) log.Debugf("Using label selector: '%s'", provider.LabelSelector)
eventsChan, err := k8sClient.WatchAll(provider.LabelSelector, stopWatch) eventsChan, err := k8sClient.WatchAll(provider.LabelSelector, stopWatch)
@ -69,7 +69,6 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
for { for {
select { select {
case <-stop: case <-stop:
stopWatch <- true
return nil return nil
case event := <-eventsChan: case event := <-eventsChan:
log.Debugf("Received event from kubernetes %+v", event) log.Debugf("Received event from kubernetes %+v", event)

View file

@ -1277,10 +1277,6 @@ func (c clientMock) GetIngresses(namespaces k8s.Namespaces) []*v1beta1.Ingress {
return result return result
} }
func (c clientMock) WatchIngresses(labelSelector string, stopCh <-chan struct{}) chan interface{} {
return c.watchChan
}
func (c clientMock) GetService(namespace, name string) (*v1.Service, bool, error) { func (c clientMock) GetService(namespace, name string) (*v1.Service, bool, error) {
for _, service := range c.services { for _, service := range c.services {
if service.Namespace == namespace && service.Name == name { if service.Namespace == namespace && service.Name == name {
@ -1299,6 +1295,6 @@ func (c clientMock) GetEndpoints(namespace, name string) (*v1.Endpoints, bool, e
return &v1.Endpoints{}, true, nil return &v1.Endpoints{}, true, nil
} }
func (c clientMock) WatchAll(labelString string, stopCh <-chan bool) (chan interface{}, error) { func (c clientMock) WatchAll(labelString string, stopCh <-chan struct{}) (<-chan interface{}, error) {
return c.watchChan, nil return c.watchChan, nil
} }