Merge pull request #845 from containous/fix-kubernetes-watch-leak
Fix Kubernetes watch leak
This commit is contained in:
commit
29f780863b
3 changed files with 22 additions and 19 deletions
|
@ -40,10 +40,15 @@ Run it and forget it!
|
|||
|
||||
You can have a quick look at Træfɪk in this [Katacoda tutorial](https://www.katacoda.com/courses/traefik/deploy-load-balancer) that shows how to load balance requests between multiple Docker containers.
|
||||
|
||||
Here is a talk (in french) given by [Emile Vauge](https://github.com/emilevauge) at the [Devoxx France 2016](http://www.devoxx.fr) conference.
|
||||
You will learn fundamental Træfɪk features and see some demos with Docker, Mesos/Marathon and Lets'Encrypt.
|
||||
Here is a talk given by [Ed Robinson](https://github.com/errm) at the [ContainerCamp UK](https://container.camp) conference.
|
||||
You will learn fundamental Træfɪk features and see some demos with Kubernetes.
|
||||
|
||||
[![Traefik Devoxx France](https://img.youtube.com/vi/QvAz9mVx5TI/0.jpg)](https://www.youtube.com/watch?v=QvAz9mVx5TI)
|
||||
[![Traefik ContainerCamp UK](http://img.youtube.com/vi/aFtpIShV60I/0.jpg)](https://www.youtube.com/watch?v=aFtpIShV60I)
|
||||
|
||||
Here is a talk (in French) given by [Emile Vauge](https://github.com/emilevauge) at the [Devoxx France 2016](http://www.devoxx.fr) conference.
|
||||
You will learn fundamental Træfɪk features and see some demos with Docker, Mesos/Marathon and Let's Encrypt.
|
||||
|
||||
[![Traefik Devoxx France](http://img.youtube.com/vi/QvAz9mVx5TI/0.jpg)](http://www.youtube.com/watch?v=QvAz9mVx5TI)
|
||||
|
||||
## Get it
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package k8s
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
|
@ -141,20 +142,20 @@ func (c *clientImpl) WatchEndpoints(stopCh <-chan bool) (chan interface{}, chan
|
|||
|
||||
// WatchAll returns events in the cluster
|
||||
func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) {
|
||||
watchCh := make(chan interface{}, 10)
|
||||
errCh := make(chan error, 10)
|
||||
watchCh := make(chan interface{}, 100)
|
||||
errCh := make(chan error, 100)
|
||||
|
||||
stopIngresses := make(chan bool)
|
||||
stopIngresses := make(chan bool, 10)
|
||||
chanIngresses, chanIngressesErr, err := c.WatchIngresses(labelSelector, stopIngresses)
|
||||
if err != nil {
|
||||
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
|
||||
}
|
||||
stopServices := make(chan bool)
|
||||
stopServices := make(chan bool, 10)
|
||||
chanServices, chanServicesErr, err := c.WatchServices(stopServices)
|
||||
if err != nil {
|
||||
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
|
||||
}
|
||||
stopEndpoints := make(chan bool)
|
||||
stopEndpoints := make(chan bool, 10)
|
||||
chanEndpoints, chanEndpointsErr, err := c.WatchEndpoints(stopEndpoints)
|
||||
if err != nil {
|
||||
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
|
||||
|
@ -257,16 +258,17 @@ func (c *clientImpl) watch(url string, labelSelector string, stopCh <-chan bool)
|
|||
if err != nil {
|
||||
return watchCh, errCh, fmt.Errorf("failed to make watch request: GET %q : %v", url, err)
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
req = req.WithContext(ctx)
|
||||
request.Client.Transport = request.Transport
|
||||
|
||||
res, err := request.Client.Do(req)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return watchCh, errCh, fmt.Errorf("failed to do watch request: GET %q: %v", url, err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
finishCh := make(chan bool)
|
||||
defer close(finishCh)
|
||||
defer close(watchCh)
|
||||
defer close(errCh)
|
||||
go func() {
|
||||
|
@ -277,20 +279,15 @@ func (c *clientImpl) watch(url string, labelSelector string, stopCh <-chan bool)
|
|||
if !strings.Contains(err.Error(), "net/http: request canceled") {
|
||||
errCh <- fmt.Errorf("failed to decode watch event: GET %q : %v", url, err)
|
||||
}
|
||||
finishCh <- true
|
||||
return
|
||||
}
|
||||
watchCh <- eventList
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case <-stopCh:
|
||||
go func() {
|
||||
request.Transport.CancelRequest(req)
|
||||
}()
|
||||
<-finishCh
|
||||
return
|
||||
}
|
||||
<-stopCh
|
||||
go func() {
|
||||
cancel() // cancel watch request
|
||||
}()
|
||||
}()
|
||||
return watchCh, errCh, nil
|
||||
}
|
||||
|
|
|
@ -130,6 +130,7 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
|
|||
log.Debugf("Received event from kubernetes %+v", event)
|
||||
templateObjects, err := provider.loadIngresses(k8sClient)
|
||||
if err != nil {
|
||||
stopWatch <- true
|
||||
return err
|
||||
}
|
||||
if reflect.DeepEqual(provider.lastConfiguration.Get(), templateObjects) {
|
||||
|
|
Loading…
Reference in a new issue