Fix k8s memory leak
Signed-off-by: Emile Vauge <emile@vauge.com>
This commit is contained in:
parent
2af1e4b192
commit
0f23581f64
4 changed files with 52 additions and 46 deletions
|
@ -5,12 +5,10 @@ import (
|
|||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/parnurzeal/gorequest"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/containous/traefik/safe"
|
||||
"github.com/parnurzeal/gorequest"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -126,8 +124,8 @@ func (c *clientImpl) WatchReplicationControllers(stopCh <-chan bool) (chan inter
|
|||
|
||||
// WatchAll returns events in the cluster
|
||||
func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) {
|
||||
watchCh := make(chan interface{})
|
||||
errCh := make(chan error)
|
||||
watchCh := make(chan interface{}, 10)
|
||||
errCh := make(chan error, 10)
|
||||
|
||||
stopIngresses := make(chan bool)
|
||||
chanIngresses, chanIngressesErr, err := c.WatchIngresses(stopIngresses)
|
||||
|
@ -164,7 +162,7 @@ func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error,
|
|||
stopServices <- true
|
||||
stopPods <- true
|
||||
stopReplicationControllers <- true
|
||||
break
|
||||
return
|
||||
case err := <-chanIngressesErr:
|
||||
errCh <- err
|
||||
case err := <-chanServicesErr:
|
||||
|
@ -193,6 +191,7 @@ func (c *clientImpl) do(request *gorequest.SuperAgent) ([]byte, error) {
|
|||
if errs != nil {
|
||||
return nil, fmt.Errorf("failed to create request: GET %q : %v", request.Url, errs)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("http error %d GET %q: %q", res.StatusCode, request.Url, string(body))
|
||||
}
|
||||
|
@ -202,6 +201,7 @@ func (c *clientImpl) do(request *gorequest.SuperAgent) ([]byte, error) {
|
|||
func (c *clientImpl) request(url string) *gorequest.SuperAgent {
|
||||
// Make request to Kubernetes API
|
||||
request := gorequest.New().Get(url)
|
||||
request.Transport.DisableKeepAlives = true
|
||||
|
||||
if strings.HasPrefix(url, "http://") {
|
||||
return request
|
||||
|
@ -223,8 +223,8 @@ type GenericObject struct {
|
|||
}
|
||||
|
||||
func (c *clientImpl) watch(url string, stopCh <-chan bool) (chan interface{}, chan error, error) {
|
||||
watchCh := make(chan interface{})
|
||||
errCh := make(chan error)
|
||||
watchCh := make(chan interface{}, 10)
|
||||
errCh := make(chan error, 10)
|
||||
|
||||
// get version
|
||||
body, err := c.do(c.request(url))
|
||||
|
@ -246,35 +246,39 @@ func (c *clientImpl) watch(url string, stopCh <-chan bool) (chan interface{}, ch
|
|||
return watchCh, errCh, fmt.Errorf("failed to make watch request: GET %q : %v", url, err)
|
||||
}
|
||||
request.Client.Transport = request.Transport
|
||||
|
||||
res, err := request.Client.Do(req)
|
||||
if err != nil {
|
||||
return watchCh, errCh, fmt.Errorf("failed to do watch request: GET %q: %v", url, err)
|
||||
}
|
||||
|
||||
shouldStop := safe.New(false)
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-stopCh:
|
||||
shouldStop.Set(true)
|
||||
res.Body.Close()
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
finishCh := make(chan bool)
|
||||
defer close(finishCh)
|
||||
defer close(watchCh)
|
||||
defer close(errCh)
|
||||
go func() {
|
||||
defer res.Body.Close()
|
||||
for {
|
||||
var eventList interface{}
|
||||
if err := json.NewDecoder(res.Body).Decode(&eventList); err != nil {
|
||||
if !shouldStop.Get().(bool) {
|
||||
errCh <- fmt.Errorf("failed to decode watch event: %v", err)
|
||||
if !strings.Contains(err.Error(), "net/http: request canceled") {
|
||||
errCh <- fmt.Errorf("failed to decode watch event: GET %q : %v", url, err)
|
||||
}
|
||||
finishCh <- true
|
||||
return
|
||||
}
|
||||
watchCh <- eventList
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case <-stopCh:
|
||||
go func() {
|
||||
request.Transport.CancelRequest(req)
|
||||
}()
|
||||
<-finishCh
|
||||
return
|
||||
}
|
||||
}()
|
||||
return watchCh, errCh, nil
|
||||
}
|
||||
|
|
|
@ -62,19 +62,20 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
|
|||
backOff := backoff.NewExponentialBackOff()
|
||||
|
||||
pool.Go(func(stop chan bool) {
|
||||
stopWatch := make(chan bool)
|
||||
defer close(stopWatch)
|
||||
operation := func() error {
|
||||
select {
|
||||
case <-stop:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
for {
|
||||
stopWatch := make(chan bool, 5)
|
||||
defer close(stopWatch)
|
||||
eventsChan, errEventsChan, err := k8sClient.WatchAll(stopWatch)
|
||||
if err != nil {
|
||||
log.Errorf("Error watching kubernetes events: %v", err)
|
||||
timer := time.NewTimer(1 * time.Second)
|
||||
select {
|
||||
case <-timer.C:
|
||||
return err
|
||||
case <-stop:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
Watch:
|
||||
for {
|
||||
|
@ -82,14 +83,15 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
|
|||
case <-stop:
|
||||
stopWatch <- true
|
||||
return nil
|
||||
case err := <-errEventsChan:
|
||||
if strings.Contains(err.Error(), io.EOF.Error()) {
|
||||
case err, ok := <-errEventsChan:
|
||||
stopWatch <- true
|
||||
if ok && strings.Contains(err.Error(), io.EOF.Error()) {
|
||||
// edge case, kubernetes long-polling disconnection
|
||||
break Watch
|
||||
}
|
||||
return err
|
||||
case event := <-eventsChan:
|
||||
log.Debugf("Received event from kubenetes %+v", event)
|
||||
log.Debugf("Received event from kubernetes %+v", event)
|
||||
templateObjects, err := provider.loadIngresses(k8sClient)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -190,13 +192,13 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur
|
|||
return service.ObjectMeta.Namespace == i.ObjectMeta.Namespace && service.Name == pa.Backend.ServiceName
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf("Error retrieving services: %v", err)
|
||||
log.Warnf("Error retrieving services: %v", err)
|
||||
continue
|
||||
}
|
||||
if len(services) == 0 {
|
||||
// no backends found, delete frontend...
|
||||
delete(templateObjects.Frontends, r.Host+pa.Path)
|
||||
log.Errorf("Error retrieving services %s", pa.Backend.ServiceName)
|
||||
log.Warnf("Error retrieving services %s", pa.Backend.ServiceName)
|
||||
}
|
||||
for _, service := range services {
|
||||
protocol := "http"
|
||||
|
|
|
@ -68,8 +68,8 @@ func NewServer(globalConfiguration GlobalConfiguration) *Server {
|
|||
server := new(Server)
|
||||
|
||||
server.serverEntryPoints = make(map[string]*serverEntryPoint)
|
||||
server.configurationChan = make(chan types.ConfigMessage, 10)
|
||||
server.configurationValidatedChan = make(chan types.ConfigMessage, 10)
|
||||
server.configurationChan = make(chan types.ConfigMessage, 100)
|
||||
server.configurationValidatedChan = make(chan types.ConfigMessage, 100)
|
||||
server.signals = make(chan os.Signal, 1)
|
||||
server.stopChan = make(chan bool, 1)
|
||||
server.providers = []provider.Provider{}
|
||||
|
|
10
web.go
10
web.go
|
@ -94,6 +94,11 @@ func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessag
|
|||
})
|
||||
systemRouter.Methods("GET").PathPrefix("/dashboard/").Handler(http.StripPrefix("/dashboard/", http.FileServer(&assetfs.AssetFS{Asset: autogen.Asset, AssetDir: autogen.AssetDir, Prefix: "static"})))
|
||||
|
||||
// expvars
|
||||
if provider.server.globalConfiguration.Debug {
|
||||
systemRouter.Methods("GET").Path("/debug/vars").HandlerFunc(expvarHandler)
|
||||
}
|
||||
|
||||
go func() {
|
||||
if len(provider.CertFile) > 0 && len(provider.KeyFile) > 0 {
|
||||
err := http.ListenAndServeTLS(provider.Address, provider.CertFile, provider.KeyFile, systemRouter)
|
||||
|
@ -107,11 +112,6 @@ func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessag
|
|||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// expvars
|
||||
if provider.server.globalConfiguration.Debug {
|
||||
systemRouter.Methods("GET").Path("/debug/vars").HandlerFunc(expvarHandler)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue