From 2dda3d2febe573e91eb82c7a2900c9306c7f065f Mon Sep 17 00:00:00 2001 From: Emile Vauge Date: Tue, 15 Nov 2016 20:36:56 +0100 Subject: [PATCH] Fix Kubernetes watch leak Signed-off-by: Emile Vauge --- provider/k8s/client.go | 29 +++++++++++++---------------- provider/kubernetes.go | 1 + 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/provider/k8s/client.go b/provider/k8s/client.go index ad9679cae..4a6bae919 100644 --- a/provider/k8s/client.go +++ b/provider/k8s/client.go @@ -1,6 +1,7 @@ package k8s import ( + "context" "crypto/tls" "crypto/x509" "encoding/json" @@ -141,20 +142,20 @@ func (c *clientImpl) WatchEndpoints(stopCh <-chan bool) (chan interface{}, chan // WatchAll returns events in the cluster func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) { - watchCh := make(chan interface{}, 10) - errCh := make(chan error, 10) + watchCh := make(chan interface{}, 100) + errCh := make(chan error, 100) - stopIngresses := make(chan bool) + stopIngresses := make(chan bool, 10) chanIngresses, chanIngressesErr, err := c.WatchIngresses(labelSelector, stopIngresses) if err != nil { return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) } - stopServices := make(chan bool) + stopServices := make(chan bool, 10) chanServices, chanServicesErr, err := c.WatchServices(stopServices) if err != nil { return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) } - stopEndpoints := make(chan bool) + stopEndpoints := make(chan bool, 10) chanEndpoints, chanEndpointsErr, err := c.WatchEndpoints(stopEndpoints) if err != nil { return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) @@ -257,16 +258,17 @@ func (c *clientImpl) watch(url string, labelSelector string, stopCh <-chan bool) if err != nil { return watchCh, errCh, fmt.Errorf("failed to make watch request: GET %q : %v", url, err) } + ctx, cancel := context.WithCancel(context.Background()) + req = req.WithContext(ctx) request.Client.Transport = request.Transport res, err := request.Client.Do(req) if err != nil { + cancel() return watchCh, errCh, fmt.Errorf("failed to do watch request: GET %q: %v", url, err) } go func() { - finishCh := make(chan bool) - defer close(finishCh) defer close(watchCh) defer close(errCh) go func() { @@ -277,20 +279,15 @@ func (c *clientImpl) watch(url string, labelSelector string, stopCh <-chan bool) if !strings.Contains(err.Error(), "net/http: request canceled") { errCh <- fmt.Errorf("failed to decode watch event: GET %q : %v", url, err) } - finishCh <- true return } watchCh <- eventList } }() - select { - case <-stopCh: - go func() { - request.Transport.CancelRequest(req) - }() - <-finishCh - return - } + <-stopCh + go func() { + cancel() // cancel watch request + }() }() return watchCh, errCh, nil } diff --git a/provider/kubernetes.go b/provider/kubernetes.go index b50216636..a08014eff 100644 --- a/provider/kubernetes.go +++ b/provider/kubernetes.go @@ -130,6 +130,7 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage log.Debugf("Received event from kubernetes %+v", event) templateObjects, err := provider.loadIngresses(k8sClient) if err != nil { + stopWatch <- true return err } if reflect.DeepEqual(provider.lastConfiguration.Get(), templateObjects) {