Merge pull request #387 from containous/fix-k8s-memory-leak

Fix k8s memory leak
This commit is contained in:
Vincent Demeester 2016-05-25 09:57:42 +02:00
commit ed3bcc6d9a
7 changed files with 80 additions and 47 deletions

4
cmd.go
View file

@ -94,7 +94,8 @@ var arguments = struct {
func init() { func init() {
traefikCmd.AddCommand(versionCmd) traefikCmd.AddCommand(versionCmd)
traefikCmd.PersistentFlags().StringP("configFile", "c", "", "Configuration file to use (TOML, JSON, YAML, HCL).") traefikCmd.PersistentFlags().StringP("configFile", "c", "", "Configuration file to use (TOML).")
traefikCmd.PersistentFlags().BoolVarP(&arguments.Debug, "debug", "d", false, "Enable debug mode")
traefikCmd.PersistentFlags().StringP("graceTimeOut", "g", "10", "Timeout in seconds. Duration to give active requests a chance to finish during hot-reloads") traefikCmd.PersistentFlags().StringP("graceTimeOut", "g", "10", "Timeout in seconds. Duration to give active requests a chance to finish during hot-reloads")
traefikCmd.PersistentFlags().String("accessLogsFile", "log/access.log", "Access logs file") traefikCmd.PersistentFlags().String("accessLogsFile", "log/access.log", "Access logs file")
traefikCmd.PersistentFlags().String("traefikLogsFile", "log/traefik.log", "Traefik logs file") traefikCmd.PersistentFlags().String("traefikLogsFile", "log/traefik.log", "Traefik logs file")
@ -178,6 +179,7 @@ func init() {
_ = viper.BindPFlag("configFile", traefikCmd.PersistentFlags().Lookup("configFile")) _ = viper.BindPFlag("configFile", traefikCmd.PersistentFlags().Lookup("configFile"))
_ = viper.BindPFlag("graceTimeOut", traefikCmd.PersistentFlags().Lookup("graceTimeOut")) _ = viper.BindPFlag("graceTimeOut", traefikCmd.PersistentFlags().Lookup("graceTimeOut"))
_ = viper.BindPFlag("logLevel", traefikCmd.PersistentFlags().Lookup("logLevel")) _ = viper.BindPFlag("logLevel", traefikCmd.PersistentFlags().Lookup("logLevel"))
_ = viper.BindPFlag("debug", traefikCmd.PersistentFlags().Lookup("debug"))
// TODO: wait for this issue to be corrected: https://github.com/spf13/viper/issues/105 // TODO: wait for this issue to be corrected: https://github.com/spf13/viper/issues/105
_ = viper.BindPFlag("providersThrottleDuration", traefikCmd.PersistentFlags().Lookup("providersThrottleDuration")) _ = viper.BindPFlag("providersThrottleDuration", traefikCmd.PersistentFlags().Lookup("providersThrottleDuration"))
_ = viper.BindPFlag("maxIdleConnsPerHost", traefikCmd.PersistentFlags().Lookup("maxIdleConnsPerHost")) _ = viper.BindPFlag("maxIdleConnsPerHost", traefikCmd.PersistentFlags().Lookup("maxIdleConnsPerHost"))

View file

@ -19,6 +19,7 @@ import (
// It's populated from the traefik configuration file passed as an argument to the binary. // It's populated from the traefik configuration file passed as an argument to the binary.
type GlobalConfiguration struct { type GlobalConfiguration struct {
GraceTimeOut int64 GraceTimeOut int64
Debug bool
AccessLogsFile string AccessLogsFile string
TraefikLogsFile string TraefikLogsFile string
LogLevel string LogLevel string

View file

@ -1,8 +1,3 @@
# etcd:
# image: gcr.io/google_containers/etcd:2.2.1
# net: host
# command: ['/usr/local/bin/etcd', '--addr=127.0.0.1:4001', '--bind-addr=0.0.0.0:4001', '--data-dir=/var/etcd/data']
kubelet: kubelet:
image: gcr.io/google_containers/hyperkube-amd64:v1.2.2 image: gcr.io/google_containers/hyperkube-amd64:v1.2.2
privileged: true privileged: true

View file

@ -5,12 +5,10 @@ import (
"crypto/x509" "crypto/x509"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/parnurzeal/gorequest"
"net/http" "net/http"
"net/url" "net/url"
"strings" "strings"
"github.com/containous/traefik/safe"
"github.com/parnurzeal/gorequest"
) )
const ( const (
@ -126,8 +124,8 @@ func (c *clientImpl) WatchReplicationControllers(stopCh <-chan bool) (chan inter
// WatchAll returns events in the cluster // WatchAll returns events in the cluster
func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) { func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) {
watchCh := make(chan interface{}) watchCh := make(chan interface{}, 10)
errCh := make(chan error) errCh := make(chan error, 10)
stopIngresses := make(chan bool) stopIngresses := make(chan bool)
chanIngresses, chanIngressesErr, err := c.WatchIngresses(stopIngresses) chanIngresses, chanIngressesErr, err := c.WatchIngresses(stopIngresses)
@ -164,7 +162,7 @@ func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error,
stopServices <- true stopServices <- true
stopPods <- true stopPods <- true
stopReplicationControllers <- true stopReplicationControllers <- true
break return
case err := <-chanIngressesErr: case err := <-chanIngressesErr:
errCh <- err errCh <- err
case err := <-chanServicesErr: case err := <-chanServicesErr:
@ -193,6 +191,7 @@ func (c *clientImpl) do(request *gorequest.SuperAgent) ([]byte, error) {
if errs != nil { if errs != nil {
return nil, fmt.Errorf("failed to create request: GET %q : %v", request.Url, errs) return nil, fmt.Errorf("failed to create request: GET %q : %v", request.Url, errs)
} }
defer res.Body.Close()
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("http error %d GET %q: %q", res.StatusCode, request.Url, string(body)) return nil, fmt.Errorf("http error %d GET %q: %q", res.StatusCode, request.Url, string(body))
} }
@ -202,6 +201,7 @@ func (c *clientImpl) do(request *gorequest.SuperAgent) ([]byte, error) {
func (c *clientImpl) request(url string) *gorequest.SuperAgent { func (c *clientImpl) request(url string) *gorequest.SuperAgent {
// Make request to Kubernetes API // Make request to Kubernetes API
request := gorequest.New().Get(url) request := gorequest.New().Get(url)
request.Transport.DisableKeepAlives = true
if strings.HasPrefix(url, "http://") { if strings.HasPrefix(url, "http://") {
return request return request
@ -223,8 +223,8 @@ type GenericObject struct {
} }
func (c *clientImpl) watch(url string, stopCh <-chan bool) (chan interface{}, chan error, error) { func (c *clientImpl) watch(url string, stopCh <-chan bool) (chan interface{}, chan error, error) {
watchCh := make(chan interface{}) watchCh := make(chan interface{}, 10)
errCh := make(chan error) errCh := make(chan error, 10)
// get version // get version
body, err := c.do(c.request(url)) body, err := c.do(c.request(url))
@ -246,35 +246,39 @@ func (c *clientImpl) watch(url string, stopCh <-chan bool) (chan interface{}, ch
return watchCh, errCh, fmt.Errorf("failed to make watch request: GET %q : %v", url, err) return watchCh, errCh, fmt.Errorf("failed to make watch request: GET %q : %v", url, err)
} }
request.Client.Transport = request.Transport request.Client.Transport = request.Transport
res, err := request.Client.Do(req) res, err := request.Client.Do(req)
if err != nil { if err != nil {
return watchCh, errCh, fmt.Errorf("failed to do watch request: GET %q: %v", url, err) return watchCh, errCh, fmt.Errorf("failed to do watch request: GET %q: %v", url, err)
} }
shouldStop := safe.New(false)
go func() {
select {
case <-stopCh:
shouldStop.Set(true)
res.Body.Close()
return
}
}()
go func() { go func() {
finishCh := make(chan bool)
defer close(finishCh)
defer close(watchCh) defer close(watchCh)
defer close(errCh) defer close(errCh)
go func() {
defer res.Body.Close()
for { for {
var eventList interface{} var eventList interface{}
if err := json.NewDecoder(res.Body).Decode(&eventList); err != nil { if err := json.NewDecoder(res.Body).Decode(&eventList); err != nil {
if !shouldStop.Get().(bool) { if !strings.Contains(err.Error(), "net/http: request canceled") {
errCh <- fmt.Errorf("failed to decode watch event: %v", err) errCh <- fmt.Errorf("failed to decode watch event: GET %q : %v", url, err)
} }
finishCh <- true
return return
} }
watchCh <- eventList watchCh <- eventList
} }
}() }()
select {
case <-stopCh:
go func() {
request.Transport.CancelRequest(req)
}()
<-finishCh
return
}
}()
return watchCh, errCh, nil return watchCh, errCh, nil
} }

View file

@ -62,19 +62,20 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
backOff := backoff.NewExponentialBackOff() backOff := backoff.NewExponentialBackOff()
pool.Go(func(stop chan bool) { pool.Go(func(stop chan bool) {
stopWatch := make(chan bool)
defer close(stopWatch)
operation := func() error { operation := func() error {
select {
case <-stop:
return nil
default:
}
for { for {
stopWatch := make(chan bool, 5)
defer close(stopWatch)
eventsChan, errEventsChan, err := k8sClient.WatchAll(stopWatch) eventsChan, errEventsChan, err := k8sClient.WatchAll(stopWatch)
if err != nil { if err != nil {
log.Errorf("Error watching kubernetes events: %v", err) log.Errorf("Error watching kubernetes events: %v", err)
timer := time.NewTimer(1 * time.Second)
select {
case <-timer.C:
return err return err
case <-stop:
return nil
}
} }
Watch: Watch:
for { for {
@ -82,14 +83,15 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
case <-stop: case <-stop:
stopWatch <- true stopWatch <- true
return nil return nil
case err := <-errEventsChan: case err, ok := <-errEventsChan:
if strings.Contains(err.Error(), io.EOF.Error()) { stopWatch <- true
if ok && strings.Contains(err.Error(), io.EOF.Error()) {
// edge case, kubernetes long-polling disconnection // edge case, kubernetes long-polling disconnection
break Watch break Watch
} }
return err return err
case event := <-eventsChan: case event := <-eventsChan:
log.Debugf("Received event from kubenetes %+v", event) log.Debugf("Received event from kubernetes %+v", event)
templateObjects, err := provider.loadIngresses(k8sClient) templateObjects, err := provider.loadIngresses(k8sClient)
if err != nil { if err != nil {
return err return err
@ -190,13 +192,13 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur
return service.ObjectMeta.Namespace == i.ObjectMeta.Namespace && service.Name == pa.Backend.ServiceName return service.ObjectMeta.Namespace == i.ObjectMeta.Namespace && service.Name == pa.Backend.ServiceName
}) })
if err != nil { if err != nil {
log.Errorf("Error retrieving services: %v", err) log.Warnf("Error retrieving services: %v", err)
continue continue
} }
if len(services) == 0 { if len(services) == 0 {
// no backends found, delete frontend... // no backends found, delete frontend...
delete(templateObjects.Frontends, r.Host+pa.Path) delete(templateObjects.Frontends, r.Host+pa.Path)
log.Errorf("Error retrieving services %s", pa.Backend.ServiceName) log.Warnf("Error retrieving services %s", pa.Backend.ServiceName)
} }
for _, service := range services { for _, service := range services {
protocol := "http" protocol := "http"

View file

@ -68,8 +68,8 @@ func NewServer(globalConfiguration GlobalConfiguration) *Server {
server := new(Server) server := new(Server)
server.serverEntryPoints = make(map[string]*serverEntryPoint) server.serverEntryPoints = make(map[string]*serverEntryPoint)
server.configurationChan = make(chan types.ConfigMessage, 10) server.configurationChan = make(chan types.ConfigMessage, 100)
server.configurationValidatedChan = make(chan types.ConfigMessage, 10) server.configurationValidatedChan = make(chan types.ConfigMessage, 100)
server.signals = make(chan os.Signal, 1) server.signals = make(chan os.Signal, 1)
server.stopChan = make(chan bool, 1) server.stopChan = make(chan bool, 1)
server.providers = []provider.Provider{} server.providers = []provider.Provider{}

29
web.go
View file

@ -2,9 +2,11 @@ package main
import ( import (
"encoding/json" "encoding/json"
"expvar"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"runtime"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/containous/traefik/autogen" "github.com/containous/traefik/autogen"
@ -33,6 +35,14 @@ var (
}) })
) )
func init() {
expvar.Publish("Goroutines", expvar.Func(goroutines))
}
func goroutines() interface{} {
return runtime.NumGoroutine()
}
// Provide allows the provider to provide configurations to traefik // Provide allows the provider to provide configurations to traefik
// using the given configuration channel. // using the given configuration channel.
func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
@ -84,6 +94,11 @@ func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessag
}) })
systemRouter.Methods("GET").PathPrefix("/dashboard/").Handler(http.StripPrefix("/dashboard/", http.FileServer(&assetfs.AssetFS{Asset: autogen.Asset, AssetDir: autogen.AssetDir, Prefix: "static"}))) systemRouter.Methods("GET").PathPrefix("/dashboard/").Handler(http.StripPrefix("/dashboard/", http.FileServer(&assetfs.AssetFS{Asset: autogen.Asset, AssetDir: autogen.AssetDir, Prefix: "static"})))
// expvars
if provider.server.globalConfiguration.Debug {
systemRouter.Methods("GET").Path("/debug/vars").HandlerFunc(expvarHandler)
}
go func() { go func() {
if len(provider.CertFile) > 0 && len(provider.KeyFile) > 0 { if len(provider.CertFile) > 0 && len(provider.KeyFile) > 0 {
err := http.ListenAndServeTLS(provider.Address, provider.CertFile, provider.KeyFile, systemRouter) err := http.ListenAndServeTLS(provider.Address, provider.CertFile, provider.KeyFile, systemRouter)
@ -231,3 +246,17 @@ func (provider *WebProvider) getRouteHandler(response http.ResponseWriter, reque
} }
http.NotFound(response, request) http.NotFound(response, request)
} }
func expvarHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
fmt.Fprintf(w, "{\n")
first := true
expvar.Do(func(kv expvar.KeyValue) {
if !first {
fmt.Fprintf(w, ",\n")
}
first = false
fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value)
})
fmt.Fprintf(w, "\n}\n")
}