From f534d8817f9530736b62cc0c3646ed9ef305f709 Mon Sep 17 00:00:00 2001 From: Thorhallur Sverrisson Date: Tue, 22 Sep 2015 04:16:21 +0000 Subject: [PATCH] Adding caching and merging of configurations Configurations are now cached from each provider separately so that we can merge them together when one provider has changed config. The Web module also returns full config info through the HTML call, but REST API is working on a separate web configuration that is sent in just like any other. --- consul.go | 6 +-- docker.go | 6 +-- file.go | 6 +-- marathon.go | 6 +-- provider.go | 2 +- traefik.go | 147 +++++++++++++++++++++++++++++----------------------- web.go | 39 ++++++++++---- 7 files changed, 124 insertions(+), 88 deletions(-) diff --git a/consul.go b/consul.go index 849fefc97..344f8f1ec 100644 --- a/consul.go +++ b/consul.go @@ -68,7 +68,7 @@ func NewConsulProvider() *ConsulProvider { return consulProvider } -func (provider *ConsulProvider) Provide(configurationChan chan<- *Configuration) { +func (provider *ConsulProvider) Provide(configurationChan chan<- configMessage) { config := &api.Config{ Address: provider.Endpoint, Scheme: "http", @@ -99,7 +99,7 @@ func (provider *ConsulProvider) Provide(configurationChan chan<- *Configuration) waitIndex = meta.LastIndex configuration := provider.loadConsulConfig() if configuration != nil { - configurationChan <- configuration + configurationChan <- configMessage{"consul", configuration} } } } @@ -107,7 +107,7 @@ func (provider *ConsulProvider) Provide(configurationChan chan<- *Configuration) } } configuration := provider.loadConsulConfig() - configurationChan <- configuration + configurationChan <- configMessage{"consul", configuration} } func (provider *ConsulProvider) loadConsulConfig() *Configuration { diff --git a/docker.go b/docker.go index 625e2634b..4ae6c2209 100644 --- a/docker.go +++ b/docker.go @@ -65,7 +65,7 @@ var DockerFuncMap = template.FuncMap{ "getHost": getHost, } -func (provider *DockerProvider) Provide(configurationChan chan<- *Configuration) { +func (provider *DockerProvider) Provide(configurationChan chan<- configMessage) { if dockerClient, err := docker.NewClient(provider.Endpoint); err != nil { log.Fatalf("Failed to create a client for docker, error: %s", err) } else { @@ -90,7 +90,7 @@ func (provider *DockerProvider) Provide(configurationChan chan<- *Configuration) log.Debugf("Docker event receveived %+v", event) configuration := provider.loadDockerConfig(dockerClient) if configuration != nil { - configurationChan <- configuration + configurationChan <- configMessage{"docker", configuration} } } } @@ -106,7 +106,7 @@ func (provider *DockerProvider) Provide(configurationChan chan<- *Configuration) } configuration := provider.loadDockerConfig(dockerClient) - configurationChan <- configuration + configurationChan <- configMessage{"docker", configuration} } } diff --git a/file.go b/file.go index ee2a5509a..4cdd4704a 100644 --- a/file.go +++ b/file.go @@ -23,7 +23,7 @@ func NewFileProvider() *FileProvider { return fileProvider } -func (provider *FileProvider) Provide(configurationChan chan<- *Configuration) { +func (provider *FileProvider) Provide(configurationChan chan<- configMessage) { watcher, err := fsnotify.NewWatcher() if err != nil { log.Error("Error creating file watcher", err) @@ -48,7 +48,7 @@ func (provider *FileProvider) Provide(configurationChan chan<- *Configuration) { log.Debug("File event:", event) configuration := provider.LoadFileConfig(file.Name()) if configuration != nil { - configurationChan <- configuration + configurationChan <- configMessage{"file", configuration} } } case error := <-watcher.Errors: @@ -67,7 +67,7 @@ func (provider *FileProvider) Provide(configurationChan chan<- *Configuration) { } configuration := provider.LoadFileConfig(file.Name()) - configurationChan <- configuration + configurationChan <- configMessage{"file", configuration} <-done } diff --git a/marathon.go b/marathon.go index 029edf864..9cb7fe12e 100644 --- a/marathon.go +++ b/marathon.go @@ -67,7 +67,7 @@ var MarathonFuncMap = template.FuncMap{ }, } -func (provider *MarathonProvider) Provide(configurationChan chan<- *Configuration) { +func (provider *MarathonProvider) Provide(configurationChan chan<- configMessage) { config := marathon.NewDefaultConfig() config.URL = provider.Endpoint config.EventsInterface = provider.NetworkInterface @@ -88,7 +88,7 @@ func (provider *MarathonProvider) Provide(configurationChan chan<- *Configuratio log.Debug("Marathon event receveived", event) configuration := provider.loadMarathonConfig() if configuration != nil { - configurationChan <- configuration + configurationChan <- configMessage{"marathon", configuration} } } }() @@ -96,7 +96,7 @@ func (provider *MarathonProvider) Provide(configurationChan chan<- *Configuratio } configuration := provider.loadMarathonConfig() - configurationChan <- configuration + configurationChan <- configMessage{"marathon", configuration} } func (provider *MarathonProvider) loadMarathonConfig() *Configuration { diff --git a/provider.go b/provider.go index 1d08df459..680341827 100644 --- a/provider.go +++ b/provider.go @@ -1,5 +1,5 @@ package main type Provider interface { - Provide(configurationChan chan<- *Configuration) + Provide(configurationChan chan<- configMessage) } diff --git a/traefik.go b/traefik.go index 1a0b8ede7..00a2ae8a4 100644 --- a/traefik.go +++ b/traefik.go @@ -28,24 +28,31 @@ import ( ) var ( - globalConfigFile = kingpin.Arg("conf", "Main configration file.").Default("traefik.toml").String() - currentConfiguration = new(Configuration) - metrics = stats.New() - oxyLogger = &OxyLogger{} - templatesRenderer = render.New(render.Options{ + globalConfigFile = kingpin.Arg("conf", "Main configration file.").Default("traefik.toml").String() + currentConfigurations = make(configs) + metrics = stats.New() + oxyLogger = &OxyLogger{} + templatesRenderer = render.New(render.Options{ Directory: "templates", Asset: Asset, AssetNames: AssetNames, }) ) +type configMessage struct { + providerName string + configuration *Configuration +} + +type configs map[string]*Configuration + func main() { runtime.GOMAXPROCS(runtime.NumCPU()) kingpin.Parse() fmtlog.SetFlags(fmtlog.Lshortfile | fmtlog.LstdFlags) var srv *manners.GracefulServer var configurationRouter *mux.Router - var configurationChan = make(chan *Configuration, 10) + var configurationChan = make(chan configMessage, 10) defer close(configurationChan) var sigs = make(chan os.Signal, 1) defer close(sigs) @@ -84,17 +91,25 @@ func main() { // listen new configurations from providers go func() { + for { - configuration := <-configurationChan - log.Infof("Configuration receveived %+v", configuration) - if configuration == nil { + configMsg := <-configurationChan + log.Infof("Configuration receveived from provider %v: %+v", configMsg.providerName, configMsg.configuration) + if configMsg.configuration == nil { log.Info("Skipping empty configuration") - } else if reflect.DeepEqual(currentConfiguration, configuration) { + } else if reflect.DeepEqual(currentConfigurations[configMsg.providerName], configMsg.configuration) { log.Info("Skipping same configuration") } else { - newConfigurationRouter, err := LoadConfig(configuration, globalConfiguration) + // Copy configurations to new map so we don't change current if LoadConfig fails + newConfigurations := make(configs) + for k, v := range currentConfigurations { + newConfigurations[k] = v + } + newConfigurations[configMsg.providerName] = configMsg.configuration + + newConfigurationRouter, err := LoadConfig(newConfigurations, globalConfiguration) if err == nil { - currentConfiguration = configuration + currentConfigurations = newConfigurations configurationRouter = newConfigurationRouter oldServer := srv newsrv := prepareServer(configurationRouter, globalConfiguration, oldServer, loggerMiddleware, metrics) @@ -210,69 +225,71 @@ func prepareServer(router *mux.Router, globalConfiguration *GlobalConfiguration, } } -func LoadConfig(configuration *Configuration, globalConfiguration *GlobalConfiguration) (*mux.Router, error) { +func LoadConfig(configurations configs, globalConfiguration *GlobalConfiguration) (*mux.Router, error) { router := mux.NewRouter() router.NotFoundHandler = http.HandlerFunc(notFoundHandler) backends := map[string]http.Handler{} - for frontendName, frontend := range configuration.Frontends { - log.Debugf("Creating frontend %s", frontendName) - fwd, _ := forward.New(forward.Logger(oxyLogger)) - newRoute := router.NewRoute().Name(frontendName) - for routeName, route := range frontend.Routes { - log.Debugf("Creating route %s %s:%s", routeName, route.Rule, route.Value) - newRouteReflect := Invoke(newRoute, route.Rule, route.Value) - newRoute = newRouteReflect[0].Interface().(*mux.Route) - } - if backends[frontend.Backend] == nil { - log.Debugf("Creating backend %s", frontend.Backend) - var lb http.Handler - rr, _ := roundrobin.New(fwd) - lbMethod, err := NewLoadBalancerMethod(configuration.Backends[frontend.Backend].LoadBalancer) - if err != nil { - configuration.Backends[frontend.Backend].LoadBalancer = &LoadBalancer{Method: "wrr"} + for _, configuration := range configurations { + for frontendName, frontend := range configuration.Frontends { + log.Debugf("Creating frontend %s", frontendName) + fwd, _ := forward.New(forward.Logger(oxyLogger)) + newRoute := router.NewRoute().Name(frontendName) + for routeName, route := range frontend.Routes { + log.Debugf("Creating route %s %s:%s", routeName, route.Rule, route.Value) + newRouteReflect := Invoke(newRoute, route.Rule, route.Value) + newRoute = newRouteReflect[0].Interface().(*mux.Route) } - switch lbMethod { - case drr: - log.Debugf("Creating load-balancer drr") - rebalancer, _ := roundrobin.NewRebalancer(rr, roundrobin.RebalancerLogger(oxyLogger)) - lb = rebalancer - for serverName, server := range configuration.Backends[frontend.Backend].Servers { - url, err := url.Parse(server.URL) - if err != nil { - return nil, err - } - log.Debugf("Creating server %s %s", serverName, url.String()) - rebalancer.UpsertServer(url, roundrobin.Weight(server.Weight)) + if backends[frontend.Backend] == nil { + log.Debugf("Creating backend %s", frontend.Backend) + var lb http.Handler + rr, _ := roundrobin.New(fwd) + lbMethod, err := NewLoadBalancerMethod(configuration.Backends[frontend.Backend].LoadBalancer) + if err != nil { + configuration.Backends[frontend.Backend].LoadBalancer = &LoadBalancer{Method: "wrr"} } - case wrr: - log.Debugf("Creating load-balancer wrr") - lb = rr - for serverName, server := range configuration.Backends[frontend.Backend].Servers { - url, err := url.Parse(server.URL) - if err != nil { - return nil, err + switch lbMethod { + case drr: + log.Debugf("Creating load-balancer drr") + rebalancer, _ := roundrobin.NewRebalancer(rr, roundrobin.RebalancerLogger(oxyLogger)) + lb = rebalancer + for serverName, server := range configuration.Backends[frontend.Backend].Servers { + url, err := url.Parse(server.URL) + if err != nil { + return nil, err + } + log.Debugf("Creating server %s %s", serverName, url.String()) + rebalancer.UpsertServer(url, roundrobin.Weight(server.Weight)) + } + case wrr: + log.Debugf("Creating load-balancer wrr") + lb = rr + for serverName, server := range configuration.Backends[frontend.Backend].Servers { + url, err := url.Parse(server.URL) + if err != nil { + return nil, err + } + log.Debugf("Creating server %s %s", serverName, url.String()) + rr.UpsertServer(url, roundrobin.Weight(server.Weight)) } - log.Debugf("Creating server %s %s", serverName, url.String()) - rr.UpsertServer(url, roundrobin.Weight(server.Weight)) } - } - var negroni = negroni.New() - if configuration.Backends[frontend.Backend].CircuitBreaker != nil { - log.Debugf("Creating circuit breaker %s", configuration.Backends[frontend.Backend].CircuitBreaker.Expression) - negroni.Use(middlewares.NewCircuitBreaker(lb, configuration.Backends[frontend.Backend].CircuitBreaker.Expression, cbreaker.Logger(oxyLogger))) + var negroni = negroni.New() + if configuration.Backends[frontend.Backend].CircuitBreaker != nil { + log.Debugf("Creating circuit breaker %s", configuration.Backends[frontend.Backend].CircuitBreaker.Expression) + negroni.Use(middlewares.NewCircuitBreaker(lb, configuration.Backends[frontend.Backend].CircuitBreaker.Expression, cbreaker.Logger(oxyLogger))) + } else { + negroni.UseHandler(lb) + } + backends[frontend.Backend] = negroni } else { - negroni.UseHandler(lb) + log.Debugf("Reusing backend %s", frontend.Backend) } - backends[frontend.Backend] = negroni - } else { - log.Debugf("Reusing backend %s", frontend.Backend) - } - // stream.New(backends[frontend.Backend], stream.Retry("IsNetworkError() && Attempts() <= " + strconv.Itoa(globalConfiguration.Replay)), stream.Logger(oxyLogger)) + // stream.New(backends[frontend.Backend], stream.Retry("IsNetworkError() && Attempts() <= " + strconv.Itoa(globalConfiguration.Replay)), stream.Logger(oxyLogger)) - newRoute.Handler(backends[frontend.Backend]) - err := newRoute.GetError() - if err != nil { - log.Error("Error building route: %s", err) + newRoute.Handler(backends[frontend.Backend]) + err := newRoute.GetError() + if err != nil { + log.Error("Error building route: %s", err) + } } } return router, nil diff --git a/web.go b/web.go index 1151ceeb3..ff6ad7c34 100644 --- a/web.go +++ b/web.go @@ -11,6 +11,10 @@ import ( "github.com/gorilla/mux" ) +var ( + webConfiguration *Configuration +) + type WebProvider struct { Address string CertFile, KeyFile string @@ -20,7 +24,7 @@ type Page struct { Configuration Configuration } -func (provider *WebProvider) Provide(configurationChan chan<- *Configuration) { +func (provider *WebProvider) Provide(configurationChan chan<- configMessage) { systemRouter := mux.NewRouter() systemRouter.Methods("GET").Path("/").Handler(http.HandlerFunc(GetHTMLConfigHandler)) systemRouter.Methods("GET").Path("/health").Handler(http.HandlerFunc(GetHealthHandler)) @@ -31,7 +35,7 @@ func (provider *WebProvider) Provide(configurationChan chan<- *Configuration) { b, _ := ioutil.ReadAll(r.Body) err := json.Unmarshal(b, configuration) if err == nil { - configurationChan <- configuration + configurationChan <- configMessage{"web", configuration} GetConfigHandler(rw, r) } else { log.Errorf("Error parsing configuration %+v", err) @@ -62,11 +66,26 @@ func (provider *WebProvider) Provide(configurationChan chan<- *Configuration) { } func GetConfigHandler(rw http.ResponseWriter, r *http.Request) { - templatesRenderer.JSON(rw, http.StatusOK, currentConfiguration) + templatesRenderer.JSON(rw, http.StatusOK, webConfiguration) } func GetHTMLConfigHandler(response http.ResponseWriter, request *http.Request) { - templatesRenderer.HTML(response, http.StatusOK, "configuration", Page{Configuration: *currentConfiguration}) + var cfg Configuration + cfg.Backends = make(map[string]*Backend) + cfg.Frontends = make(map[string]*Frontend) + + // Quick and dirty merge of config for display + for _, config := range currentConfigurations { + for name, config := range config.Backends { + cfg.Backends[name] = config + } + + for name, config := range config.Frontends { + cfg.Frontends[name] = config + } + } + + templatesRenderer.HTML(response, http.StatusOK, "configuration", Page{Configuration: cfg}) } func GetHealthHandler(rw http.ResponseWriter, r *http.Request) { @@ -74,13 +93,13 @@ func GetHealthHandler(rw http.ResponseWriter, r *http.Request) { } func GetBackendsHandler(rw http.ResponseWriter, r *http.Request) { - templatesRenderer.JSON(rw, http.StatusOK, currentConfiguration.Backends) + templatesRenderer.JSON(rw, http.StatusOK, webConfiguration.Backends) } func GetBackendHandler(rw http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) id := vars["backend"] - if backend, ok := currentConfiguration.Backends[id]; ok { + if backend, ok := webConfiguration.Backends[id]; ok { templatesRenderer.JSON(rw, http.StatusOK, backend) } else { http.NotFound(rw, r) @@ -88,13 +107,13 @@ func GetBackendHandler(rw http.ResponseWriter, r *http.Request) { } func GetFrontendsHandler(rw http.ResponseWriter, r *http.Request) { - templatesRenderer.JSON(rw, http.StatusOK, currentConfiguration.Frontends) + templatesRenderer.JSON(rw, http.StatusOK, webConfiguration.Frontends) } func GetFrontendHandler(rw http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) id := vars["frontend"] - if frontend, ok := currentConfiguration.Frontends[id]; ok { + if frontend, ok := webConfiguration.Frontends[id]; ok { templatesRenderer.JSON(rw, http.StatusOK, frontend) } else { http.NotFound(rw, r) @@ -104,7 +123,7 @@ func GetFrontendHandler(rw http.ResponseWriter, r *http.Request) { func GetServersHandler(rw http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) backend := vars["backend"] - if backend, ok := currentConfiguration.Backends[backend]; ok { + if backend, ok := webConfiguration.Backends[backend]; ok { templatesRenderer.JSON(rw, http.StatusOK, backend.Servers) } else { http.NotFound(rw, r) @@ -115,7 +134,7 @@ func GetServerHandler(rw http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) backend := vars["backend"] server := vars["server"] - if backend, ok := currentConfiguration.Backends[backend]; ok { + if backend, ok := webConfiguration.Backends[backend]; ok { if server, ok := backend.Servers[server]; ok { templatesRenderer.JSON(rw, http.StatusOK, server) } else {