From 86f3891a2bd9bd3735b3adfb66f8160891fe7e3e Mon Sep 17 00:00:00 2001 From: Emile Vauge Date: Thu, 19 May 2016 20:06:33 +0200 Subject: [PATCH 1/4] Add debug flag Signed-off-by: Emile Vauge --- cmd.go | 4 +++- configuration.go | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd.go b/cmd.go index 573382ea6..6580d6a15 100644 --- a/cmd.go +++ b/cmd.go @@ -94,7 +94,8 @@ var arguments = struct { func init() { traefikCmd.AddCommand(versionCmd) - traefikCmd.PersistentFlags().StringP("configFile", "c", "", "Configuration file to use (TOML, JSON, YAML, HCL).") + traefikCmd.PersistentFlags().StringP("configFile", "c", "", "Configuration file to use (TOML).") + traefikCmd.PersistentFlags().BoolVarP(&arguments.Debug, "debug", "d", false, "Enable debug mode") traefikCmd.PersistentFlags().StringP("graceTimeOut", "g", "10", "Timeout in seconds. Duration to give active requests a chance to finish during hot-reloads") traefikCmd.PersistentFlags().String("accessLogsFile", "log/access.log", "Access logs file") traefikCmd.PersistentFlags().String("traefikLogsFile", "log/traefik.log", "Traefik logs file") @@ -178,6 +179,7 @@ func init() { _ = viper.BindPFlag("configFile", traefikCmd.PersistentFlags().Lookup("configFile")) _ = viper.BindPFlag("graceTimeOut", traefikCmd.PersistentFlags().Lookup("graceTimeOut")) _ = viper.BindPFlag("logLevel", traefikCmd.PersistentFlags().Lookup("logLevel")) + _ = viper.BindPFlag("debug", traefikCmd.PersistentFlags().Lookup("debug")) // TODO: wait for this issue to be corrected: https://github.com/spf13/viper/issues/105 _ = viper.BindPFlag("providersThrottleDuration", traefikCmd.PersistentFlags().Lookup("providersThrottleDuration")) _ = viper.BindPFlag("maxIdleConnsPerHost", traefikCmd.PersistentFlags().Lookup("maxIdleConnsPerHost")) diff --git a/configuration.go b/configuration.go index 693b74c90..bae48d7cb 100644 --- a/configuration.go +++ b/configuration.go @@ -19,6 +19,7 @@ import ( // It's populated from the traefik configuration file passed as an argument to the binary. type GlobalConfiguration struct { GraceTimeOut int64 + Debug bool AccessLogsFile string TraefikLogsFile string LogLevel string From dc404b365f6b4c03b36be66a2e60f7c25de123f4 Mon Sep 17 00:00:00 2001 From: Emile Vauge Date: Thu, 19 May 2016 20:06:49 +0200 Subject: [PATCH 2/4] Add expvar endpoint Signed-off-by: Emile Vauge --- web.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/web.go b/web.go index 445a83bce..d6093db7e 100644 --- a/web.go +++ b/web.go @@ -2,9 +2,11 @@ package main import ( "encoding/json" + "expvar" "fmt" "io/ioutil" "net/http" + "runtime" log "github.com/Sirupsen/logrus" "github.com/containous/traefik/autogen" @@ -33,6 +35,14 @@ var ( }) ) +func init() { + expvar.Publish("Goroutines", expvar.Func(goroutines)) +} + +func goroutines() interface{} { + return runtime.NumGoroutine() +} + // Provide allows the provider to provide configurations to traefik // using the given configuration channel. func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { @@ -97,6 +107,11 @@ func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessag } } }() + + // expvars + if provider.server.globalConfiguration.Debug { + systemRouter.Methods("GET").Path("/debug/vars").HandlerFunc(expvarHandler) + } return nil } @@ -231,3 +246,17 @@ func (provider *WebProvider) getRouteHandler(response http.ResponseWriter, reque } http.NotFound(response, request) } + +func expvarHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=utf-8") + fmt.Fprintf(w, "{\n") + first := true + expvar.Do(func(kv expvar.KeyValue) { + if !first { + fmt.Fprintf(w, ",\n") + } + first = false + fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value) + }) + fmt.Fprintf(w, "\n}\n") +} From 2af1e4b1921d7a00cbfe63133695863029ce2cb4 Mon Sep 17 00:00:00 2001 From: Emile Vauge Date: Thu, 19 May 2016 20:07:57 +0200 Subject: [PATCH 3/4] Fix k8s compose file Signed-off-by: Emile Vauge --- examples/compose-k8s.yaml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/examples/compose-k8s.yaml b/examples/compose-k8s.yaml index e9abe96b4..626ea1d1c 100644 --- a/examples/compose-k8s.yaml +++ b/examples/compose-k8s.yaml @@ -1,8 +1,3 @@ -# etcd: -# image: gcr.io/google_containers/etcd:2.2.1 -# net: host -# command: ['/usr/local/bin/etcd', '--addr=127.0.0.1:4001', '--bind-addr=0.0.0.0:4001', '--data-dir=/var/etcd/data'] - kubelet: image: gcr.io/google_containers/hyperkube-amd64:v1.2.2 privileged: true From 0f23581f644707eca93226136cc933e5fa947d4c Mon Sep 17 00:00:00 2001 From: Emile Vauge Date: Thu, 19 May 2016 20:09:01 +0200 Subject: [PATCH 4/4] Fix k8s memory leak Signed-off-by: Emile Vauge --- provider/k8s/client.go | 56 ++++++++++++++++++++++-------------------- provider/kubernetes.go | 28 +++++++++++---------- server.go | 4 +-- web.go | 10 ++++---- 4 files changed, 52 insertions(+), 46 deletions(-) diff --git a/provider/k8s/client.go b/provider/k8s/client.go index ec089f40a..957d66a9b 100644 --- a/provider/k8s/client.go +++ b/provider/k8s/client.go @@ -5,12 +5,10 @@ import ( "crypto/x509" "encoding/json" "fmt" + "github.com/parnurzeal/gorequest" "net/http" "net/url" "strings" - - "github.com/containous/traefik/safe" - "github.com/parnurzeal/gorequest" ) const ( @@ -126,8 +124,8 @@ func (c *clientImpl) WatchReplicationControllers(stopCh <-chan bool) (chan inter // WatchAll returns events in the cluster func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) { - watchCh := make(chan interface{}) - errCh := make(chan error) + watchCh := make(chan interface{}, 10) + errCh := make(chan error, 10) stopIngresses := make(chan bool) chanIngresses, chanIngressesErr, err := c.WatchIngresses(stopIngresses) @@ -164,7 +162,7 @@ func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, stopServices <- true stopPods <- true stopReplicationControllers <- true - break + return case err := <-chanIngressesErr: errCh <- err case err := <-chanServicesErr: @@ -193,6 +191,7 @@ func (c *clientImpl) do(request *gorequest.SuperAgent) ([]byte, error) { if errs != nil { return nil, fmt.Errorf("failed to create request: GET %q : %v", request.Url, errs) } + defer res.Body.Close() if res.StatusCode != http.StatusOK { return nil, fmt.Errorf("http error %d GET %q: %q", res.StatusCode, request.Url, string(body)) } @@ -202,6 +201,7 @@ func (c *clientImpl) do(request *gorequest.SuperAgent) ([]byte, error) { func (c *clientImpl) request(url string) *gorequest.SuperAgent { // Make request to Kubernetes API request := gorequest.New().Get(url) + request.Transport.DisableKeepAlives = true if strings.HasPrefix(url, "http://") { return request @@ -223,8 +223,8 @@ type GenericObject struct { } func (c *clientImpl) watch(url string, stopCh <-chan bool) (chan interface{}, chan error, error) { - watchCh := make(chan interface{}) - errCh := make(chan error) + watchCh := make(chan interface{}, 10) + errCh := make(chan error, 10) // get version body, err := c.do(c.request(url)) @@ -246,34 +246,38 @@ func (c *clientImpl) watch(url string, stopCh <-chan bool) (chan interface{}, ch return watchCh, errCh, fmt.Errorf("failed to make watch request: GET %q : %v", url, err) } request.Client.Transport = request.Transport + res, err := request.Client.Do(req) if err != nil { return watchCh, errCh, fmt.Errorf("failed to do watch request: GET %q: %v", url, err) } - shouldStop := safe.New(false) - - go func() { - select { - case <-stopCh: - shouldStop.Set(true) - res.Body.Close() - return - } - }() - go func() { + finishCh := make(chan bool) + defer close(finishCh) defer close(watchCh) defer close(errCh) - for { - var eventList interface{} - if err := json.NewDecoder(res.Body).Decode(&eventList); err != nil { - if !shouldStop.Get().(bool) { - errCh <- fmt.Errorf("failed to decode watch event: %v", err) + go func() { + defer res.Body.Close() + for { + var eventList interface{} + if err := json.NewDecoder(res.Body).Decode(&eventList); err != nil { + if !strings.Contains(err.Error(), "net/http: request canceled") { + errCh <- fmt.Errorf("failed to decode watch event: GET %q : %v", url, err) + } + finishCh <- true + return } - return + watchCh <- eventList } - watchCh <- eventList + }() + select { + case <-stopCh: + go func() { + request.Transport.CancelRequest(req) + }() + <-finishCh + return } }() return watchCh, errCh, nil diff --git a/provider/kubernetes.go b/provider/kubernetes.go index 4af905084..0cfb75625 100644 --- a/provider/kubernetes.go +++ b/provider/kubernetes.go @@ -62,19 +62,20 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage backOff := backoff.NewExponentialBackOff() pool.Go(func(stop chan bool) { - stopWatch := make(chan bool) - defer close(stopWatch) operation := func() error { - select { - case <-stop: - return nil - default: - } for { + stopWatch := make(chan bool, 5) + defer close(stopWatch) eventsChan, errEventsChan, err := k8sClient.WatchAll(stopWatch) if err != nil { log.Errorf("Error watching kubernetes events: %v", err) - return err + timer := time.NewTimer(1 * time.Second) + select { + case <-timer.C: + return err + case <-stop: + return nil + } } Watch: for { @@ -82,14 +83,15 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage case <-stop: stopWatch <- true return nil - case err := <-errEventsChan: - if strings.Contains(err.Error(), io.EOF.Error()) { + case err, ok := <-errEventsChan: + stopWatch <- true + if ok && strings.Contains(err.Error(), io.EOF.Error()) { // edge case, kubernetes long-polling disconnection break Watch } return err case event := <-eventsChan: - log.Debugf("Received event from kubenetes %+v", event) + log.Debugf("Received event from kubernetes %+v", event) templateObjects, err := provider.loadIngresses(k8sClient) if err != nil { return err @@ -190,13 +192,13 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur return service.ObjectMeta.Namespace == i.ObjectMeta.Namespace && service.Name == pa.Backend.ServiceName }) if err != nil { - log.Errorf("Error retrieving services: %v", err) + log.Warnf("Error retrieving services: %v", err) continue } if len(services) == 0 { // no backends found, delete frontend... delete(templateObjects.Frontends, r.Host+pa.Path) - log.Errorf("Error retrieving services %s", pa.Backend.ServiceName) + log.Warnf("Error retrieving services %s", pa.Backend.ServiceName) } for _, service := range services { protocol := "http" diff --git a/server.go b/server.go index 39a09c271..c1bb55cd0 100644 --- a/server.go +++ b/server.go @@ -68,8 +68,8 @@ func NewServer(globalConfiguration GlobalConfiguration) *Server { server := new(Server) server.serverEntryPoints = make(map[string]*serverEntryPoint) - server.configurationChan = make(chan types.ConfigMessage, 10) - server.configurationValidatedChan = make(chan types.ConfigMessage, 10) + server.configurationChan = make(chan types.ConfigMessage, 100) + server.configurationValidatedChan = make(chan types.ConfigMessage, 100) server.signals = make(chan os.Signal, 1) server.stopChan = make(chan bool, 1) server.providers = []provider.Provider{} diff --git a/web.go b/web.go index d6093db7e..f32874aa6 100644 --- a/web.go +++ b/web.go @@ -94,6 +94,11 @@ func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessag }) systemRouter.Methods("GET").PathPrefix("/dashboard/").Handler(http.StripPrefix("/dashboard/", http.FileServer(&assetfs.AssetFS{Asset: autogen.Asset, AssetDir: autogen.AssetDir, Prefix: "static"}))) + // expvars + if provider.server.globalConfiguration.Debug { + systemRouter.Methods("GET").Path("/debug/vars").HandlerFunc(expvarHandler) + } + go func() { if len(provider.CertFile) > 0 && len(provider.KeyFile) > 0 { err := http.ListenAndServeTLS(provider.Address, provider.CertFile, provider.KeyFile, systemRouter) @@ -107,11 +112,6 @@ func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessag } } }() - - // expvars - if provider.server.globalConfiguration.Debug { - systemRouter.Methods("GET").Path("/debug/vars").HandlerFunc(expvarHandler) - } return nil }