Fix Kubernetes watch leak

Signed-off-by: Emile Vauge <emile@vauge.com>
This commit is contained in:
Emile Vauge 2016-11-15 20:36:56 +01:00
parent 22ebaedb45
commit 2dda3d2feb
No known key found for this signature in database
GPG key ID: D808B4C167352E59
2 changed files with 14 additions and 16 deletions

View file

@ -1,6 +1,7 @@
package k8s package k8s
import ( import (
"context"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"encoding/json" "encoding/json"
@ -141,20 +142,20 @@ func (c *clientImpl) WatchEndpoints(stopCh <-chan bool) (chan interface{}, chan
// WatchAll returns events in the cluster // WatchAll returns events in the cluster
func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) { func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) {
watchCh := make(chan interface{}, 10) watchCh := make(chan interface{}, 100)
errCh := make(chan error, 10) errCh := make(chan error, 100)
stopIngresses := make(chan bool) stopIngresses := make(chan bool, 10)
chanIngresses, chanIngressesErr, err := c.WatchIngresses(labelSelector, stopIngresses) chanIngresses, chanIngressesErr, err := c.WatchIngresses(labelSelector, stopIngresses)
if err != nil { if err != nil {
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
} }
stopServices := make(chan bool) stopServices := make(chan bool, 10)
chanServices, chanServicesErr, err := c.WatchServices(stopServices) chanServices, chanServicesErr, err := c.WatchServices(stopServices)
if err != nil { if err != nil {
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
} }
stopEndpoints := make(chan bool) stopEndpoints := make(chan bool, 10)
chanEndpoints, chanEndpointsErr, err := c.WatchEndpoints(stopEndpoints) chanEndpoints, chanEndpointsErr, err := c.WatchEndpoints(stopEndpoints)
if err != nil { if err != nil {
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
@ -257,16 +258,17 @@ func (c *clientImpl) watch(url string, labelSelector string, stopCh <-chan bool)
if err != nil { if err != nil {
return watchCh, errCh, fmt.Errorf("failed to make watch request: GET %q : %v", url, err) return watchCh, errCh, fmt.Errorf("failed to make watch request: GET %q : %v", url, err)
} }
ctx, cancel := context.WithCancel(context.Background())
req = req.WithContext(ctx)
request.Client.Transport = request.Transport request.Client.Transport = request.Transport
res, err := request.Client.Do(req) res, err := request.Client.Do(req)
if err != nil { if err != nil {
cancel()
return watchCh, errCh, fmt.Errorf("failed to do watch request: GET %q: %v", url, err) return watchCh, errCh, fmt.Errorf("failed to do watch request: GET %q: %v", url, err)
} }
go func() { go func() {
finishCh := make(chan bool)
defer close(finishCh)
defer close(watchCh) defer close(watchCh)
defer close(errCh) defer close(errCh)
go func() { go func() {
@ -277,20 +279,15 @@ func (c *clientImpl) watch(url string, labelSelector string, stopCh <-chan bool)
if !strings.Contains(err.Error(), "net/http: request canceled") { if !strings.Contains(err.Error(), "net/http: request canceled") {
errCh <- fmt.Errorf("failed to decode watch event: GET %q : %v", url, err) errCh <- fmt.Errorf("failed to decode watch event: GET %q : %v", url, err)
} }
finishCh <- true
return return
} }
watchCh <- eventList watchCh <- eventList
} }
}() }()
select { <-stopCh
case <-stopCh: go func() {
go func() { cancel() // cancel watch request
request.Transport.CancelRequest(req) }()
}()
<-finishCh
return
}
}() }()
return watchCh, errCh, nil return watchCh, errCh, nil
} }

View file

@ -130,6 +130,7 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
log.Debugf("Received event from kubernetes %+v", event) log.Debugf("Received event from kubernetes %+v", event)
templateObjects, err := provider.loadIngresses(k8sClient) templateObjects, err := provider.loadIngresses(k8sClient)
if err != nil { if err != nil {
stopWatch <- true
return err return err
} }
if reflect.DeepEqual(provider.lastConfiguration.Get(), templateObjects) { if reflect.DeepEqual(provider.lastConfiguration.Get(), templateObjects) {