Merge pull request #932 from yvespp/master

Kubernetes: cleanup channel handling
This commit is contained in:
Vincent Demeester 2016-12-08 11:23:21 +01:00 committed by GitHub
commit c500873586
3 changed files with 32 additions and 44 deletions

View file

@ -24,7 +24,7 @@ type Client interface {
GetIngresses(namespaces Namespaces) []*v1beta1.Ingress
GetService(namespace, name string) (*v1.Service, bool, error)
GetEndpoints(namespace, name string) (*v1.Endpoints, bool, error)
WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, error)
WatchAll(labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error)
}
type clientImpl struct {
@ -90,9 +90,7 @@ func (c *clientImpl) GetIngresses(namespaces Namespaces) []*v1beta1.Ingress {
}
// WatchIngresses starts the watch of Kubernetes Ingresses resources and updates the corresponding store
func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, stopCh <-chan struct{}) chan interface{} {
watchCh := make(chan interface{}, 10)
func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, watchCh chan<- interface{}, stopCh <-chan struct{}) {
source := NewListWatchFromClient(
c.clientset.ExtensionsClient,
"ingresses",
@ -106,18 +104,24 @@ func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, stopCh <-chan
resyncPeriod,
newResourceEventHandlerFuncs(watchCh))
go c.ingController.Run(stopCh)
return watchCh
}
func newResourceEventHandlerFuncs(events chan interface{}) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { events <- obj },
UpdateFunc: func(old, new interface{}) { events <- new },
DeleteFunc: func(obj interface{}) { events <- obj },
// eventHandlerFunc will pass the obj on to the events channel or drop it
// This is so passing the events along won't block in the case of high volume
// The events are only used for signalling anyway so dropping a few is ok
func eventHandlerFunc(events chan<- interface{}, obj interface{}) {
select {
case events <- obj:
default:
}
}
func newResourceEventHandlerFuncs(events chan<- interface{}) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { eventHandlerFunc(events, obj) },
UpdateFunc: func(old, new interface{}) { eventHandlerFunc(events, new) },
DeleteFunc: func(obj interface{}) { eventHandlerFunc(events, obj) },
}
}
// GetService returns the named service from the named namespace
@ -132,9 +136,7 @@ func (c *clientImpl) GetService(namespace, name string) (*v1.Service, bool, erro
}
// WatchServices starts the watch of Kubernetes Service resources and updates the corresponding store
func (c *clientImpl) WatchServices(stopCh <-chan struct{}) chan interface{} {
watchCh := make(chan interface{}, 10)
func (c *clientImpl) WatchServices(watchCh chan<- interface{}, stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
c.clientset.CoreClient,
"services",
@ -147,8 +149,6 @@ func (c *clientImpl) WatchServices(stopCh <-chan struct{}) chan interface{} {
resyncPeriod,
newResourceEventHandlerFuncs(watchCh))
go c.svcController.Run(stopCh)
return watchCh
}
// GetEndpoints returns the named Endpoints
@ -165,9 +165,7 @@ func (c *clientImpl) GetEndpoints(namespace, name string) (*v1.Endpoints, bool,
}
// WatchEndpoints starts the watch of Kubernetes Endpoints resources and updates the corresponding store
func (c *clientImpl) WatchEndpoints(stopCh <-chan struct{}) chan interface{} {
watchCh := make(chan interface{}, 10)
func (c *clientImpl) WatchEndpoints(watchCh chan<- interface{}, stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
c.clientset.CoreClient,
"endpoints",
@ -180,38 +178,32 @@ func (c *clientImpl) WatchEndpoints(stopCh <-chan struct{}) chan interface{} {
resyncPeriod,
newResourceEventHandlerFuncs(watchCh))
go c.epController.Run(stopCh)
return watchCh
}
// WatchAll returns events in the cluster and updates the stores via informer
// Filters ingresses by labelSelector
func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, error) {
watchCh := make(chan interface{}, 100)
stopWatchCh := make(chan struct{}, 1)
func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error) {
watchCh := make(chan interface{}, 1)
eventCh := make(chan interface{}, 1)
kubeLabelSelector, err := labels.Parse(labelSelector)
if err != nil {
return nil, err
}
chanIngresses := c.WatchIngresses(kubeLabelSelector, stopWatchCh)
chanServices := c.WatchServices(stopWatchCh)
chanEndpoints := c.WatchEndpoints(stopWatchCh)
c.WatchIngresses(kubeLabelSelector, eventCh, stopCh)
c.WatchServices(eventCh, stopCh)
c.WatchEndpoints(eventCh, stopCh)
go func() {
defer close(stopWatchCh)
defer close(watchCh)
defer close(eventCh)
for {
select {
case <-stopCh:
return
case event := <-chanIngresses:
c.fireEvent(event, watchCh)
case event := <-chanServices:
c.fireEvent(event, watchCh)
case event := <-chanEndpoints:
case event := <-eventCh:
c.fireEvent(event, watchCh)
}
}
@ -222,10 +214,11 @@ func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan bool) (chan in
// fireEvent checks if all controllers have synced before firing
// Used after startup or a reconnect
func (c *clientImpl) fireEvent(event interface{}, watchCh chan interface{}) {
if c.ingController.HasSynced() && c.svcController.HasSynced() && c.epController.HasSynced() {
watchCh <- event
func (c *clientImpl) fireEvent(event interface{}, eventCh chan interface{}) {
if !c.ingController.HasSynced() || !c.svcController.HasSynced() || !c.epController.HasSynced() {
return
}
eventHandlerFunc(eventCh, event)
}
// HasNamespace checks if the ingress is in one of the namespaces

View file

@ -52,7 +52,7 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
pool.Go(func(stop chan bool) {
operation := func() error {
for {
stopWatch := make(chan bool, 5)
stopWatch := make(chan struct{}, 1)
defer close(stopWatch)
log.Debugf("Using label selector: '%s'", provider.LabelSelector)
eventsChan, err := k8sClient.WatchAll(provider.LabelSelector, stopWatch)
@ -69,7 +69,6 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
for {
select {
case <-stop:
stopWatch <- true
return nil
case event := <-eventsChan:
log.Debugf("Received event from kubernetes %+v", event)

View file

@ -1277,10 +1277,6 @@ func (c clientMock) GetIngresses(namespaces k8s.Namespaces) []*v1beta1.Ingress {
return result
}
func (c clientMock) WatchIngresses(labelSelector string, stopCh <-chan struct{}) chan interface{} {
return c.watchChan
}
func (c clientMock) GetService(namespace, name string) (*v1.Service, bool, error) {
for _, service := range c.services {
if service.Namespace == namespace && service.Name == name {
@ -1299,6 +1295,6 @@ func (c clientMock) GetEndpoints(namespace, name string) (*v1.Endpoints, bool, e
return &v1.Endpoints{}, true, nil
}
func (c clientMock) WatchAll(labelString string, stopCh <-chan bool) (chan interface{}, error) {
func (c clientMock) WatchAll(labelString string, stopCh <-chan struct{}) (<-chan interface{}, error) {
return c.watchChan, nil
}