Adding caching and merging of configurations

Configurations are now cached from each provider separately so that
we can merge them together when one provider has changed config.

The Web module also returns full config info through the HTML call,
but REST API is working on a separate web configuration that is
sent in just like any other.
This commit is contained in:
Thorhallur Sverrisson 2015-09-22 04:16:21 +00:00
parent 537c5c18dd
commit f534d8817f
7 changed files with 124 additions and 88 deletions

View file

@ -68,7 +68,7 @@ func NewConsulProvider() *ConsulProvider {
return consulProvider return consulProvider
} }
func (provider *ConsulProvider) Provide(configurationChan chan<- *Configuration) { func (provider *ConsulProvider) Provide(configurationChan chan<- configMessage) {
config := &api.Config{ config := &api.Config{
Address: provider.Endpoint, Address: provider.Endpoint,
Scheme: "http", Scheme: "http",
@ -99,7 +99,7 @@ func (provider *ConsulProvider) Provide(configurationChan chan<- *Configuration)
waitIndex = meta.LastIndex waitIndex = meta.LastIndex
configuration := provider.loadConsulConfig() configuration := provider.loadConsulConfig()
if configuration != nil { if configuration != nil {
configurationChan <- configuration configurationChan <- configMessage{"consul", configuration}
} }
} }
} }
@ -107,7 +107,7 @@ func (provider *ConsulProvider) Provide(configurationChan chan<- *Configuration)
} }
} }
configuration := provider.loadConsulConfig() configuration := provider.loadConsulConfig()
configurationChan <- configuration configurationChan <- configMessage{"consul", configuration}
} }
func (provider *ConsulProvider) loadConsulConfig() *Configuration { func (provider *ConsulProvider) loadConsulConfig() *Configuration {

View file

@ -65,7 +65,7 @@ var DockerFuncMap = template.FuncMap{
"getHost": getHost, "getHost": getHost,
} }
func (provider *DockerProvider) Provide(configurationChan chan<- *Configuration) { func (provider *DockerProvider) Provide(configurationChan chan<- configMessage) {
if dockerClient, err := docker.NewClient(provider.Endpoint); err != nil { if dockerClient, err := docker.NewClient(provider.Endpoint); err != nil {
log.Fatalf("Failed to create a client for docker, error: %s", err) log.Fatalf("Failed to create a client for docker, error: %s", err)
} else { } else {
@ -90,7 +90,7 @@ func (provider *DockerProvider) Provide(configurationChan chan<- *Configuration)
log.Debugf("Docker event receveived %+v", event) log.Debugf("Docker event receveived %+v", event)
configuration := provider.loadDockerConfig(dockerClient) configuration := provider.loadDockerConfig(dockerClient)
if configuration != nil { if configuration != nil {
configurationChan <- configuration configurationChan <- configMessage{"docker", configuration}
} }
} }
} }
@ -106,7 +106,7 @@ func (provider *DockerProvider) Provide(configurationChan chan<- *Configuration)
} }
configuration := provider.loadDockerConfig(dockerClient) configuration := provider.loadDockerConfig(dockerClient)
configurationChan <- configuration configurationChan <- configMessage{"docker", configuration}
} }
} }

View file

@ -23,7 +23,7 @@ func NewFileProvider() *FileProvider {
return fileProvider return fileProvider
} }
func (provider *FileProvider) Provide(configurationChan chan<- *Configuration) { func (provider *FileProvider) Provide(configurationChan chan<- configMessage) {
watcher, err := fsnotify.NewWatcher() watcher, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
log.Error("Error creating file watcher", err) log.Error("Error creating file watcher", err)
@ -48,7 +48,7 @@ func (provider *FileProvider) Provide(configurationChan chan<- *Configuration) {
log.Debug("File event:", event) log.Debug("File event:", event)
configuration := provider.LoadFileConfig(file.Name()) configuration := provider.LoadFileConfig(file.Name())
if configuration != nil { if configuration != nil {
configurationChan <- configuration configurationChan <- configMessage{"file", configuration}
} }
} }
case error := <-watcher.Errors: case error := <-watcher.Errors:
@ -67,7 +67,7 @@ func (provider *FileProvider) Provide(configurationChan chan<- *Configuration) {
} }
configuration := provider.LoadFileConfig(file.Name()) configuration := provider.LoadFileConfig(file.Name())
configurationChan <- configuration configurationChan <- configMessage{"file", configuration}
<-done <-done
} }

View file

@ -67,7 +67,7 @@ var MarathonFuncMap = template.FuncMap{
}, },
} }
func (provider *MarathonProvider) Provide(configurationChan chan<- *Configuration) { func (provider *MarathonProvider) Provide(configurationChan chan<- configMessage) {
config := marathon.NewDefaultConfig() config := marathon.NewDefaultConfig()
config.URL = provider.Endpoint config.URL = provider.Endpoint
config.EventsInterface = provider.NetworkInterface config.EventsInterface = provider.NetworkInterface
@ -88,7 +88,7 @@ func (provider *MarathonProvider) Provide(configurationChan chan<- *Configuratio
log.Debug("Marathon event receveived", event) log.Debug("Marathon event receveived", event)
configuration := provider.loadMarathonConfig() configuration := provider.loadMarathonConfig()
if configuration != nil { if configuration != nil {
configurationChan <- configuration configurationChan <- configMessage{"marathon", configuration}
} }
} }
}() }()
@ -96,7 +96,7 @@ func (provider *MarathonProvider) Provide(configurationChan chan<- *Configuratio
} }
configuration := provider.loadMarathonConfig() configuration := provider.loadMarathonConfig()
configurationChan <- configuration configurationChan <- configMessage{"marathon", configuration}
} }
func (provider *MarathonProvider) loadMarathonConfig() *Configuration { func (provider *MarathonProvider) loadMarathonConfig() *Configuration {

View file

@ -1,5 +1,5 @@
package main package main
type Provider interface { type Provider interface {
Provide(configurationChan chan<- *Configuration) Provide(configurationChan chan<- configMessage)
} }

View file

@ -28,24 +28,31 @@ import (
) )
var ( var (
globalConfigFile = kingpin.Arg("conf", "Main configration file.").Default("traefik.toml").String() globalConfigFile = kingpin.Arg("conf", "Main configration file.").Default("traefik.toml").String()
currentConfiguration = new(Configuration) currentConfigurations = make(configs)
metrics = stats.New() metrics = stats.New()
oxyLogger = &OxyLogger{} oxyLogger = &OxyLogger{}
templatesRenderer = render.New(render.Options{ templatesRenderer = render.New(render.Options{
Directory: "templates", Directory: "templates",
Asset: Asset, Asset: Asset,
AssetNames: AssetNames, AssetNames: AssetNames,
}) })
) )
type configMessage struct {
providerName string
configuration *Configuration
}
type configs map[string]*Configuration
func main() { func main() {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
kingpin.Parse() kingpin.Parse()
fmtlog.SetFlags(fmtlog.Lshortfile | fmtlog.LstdFlags) fmtlog.SetFlags(fmtlog.Lshortfile | fmtlog.LstdFlags)
var srv *manners.GracefulServer var srv *manners.GracefulServer
var configurationRouter *mux.Router var configurationRouter *mux.Router
var configurationChan = make(chan *Configuration, 10) var configurationChan = make(chan configMessage, 10)
defer close(configurationChan) defer close(configurationChan)
var sigs = make(chan os.Signal, 1) var sigs = make(chan os.Signal, 1)
defer close(sigs) defer close(sigs)
@ -84,17 +91,25 @@ func main() {
// listen new configurations from providers // listen new configurations from providers
go func() { go func() {
for { for {
configuration := <-configurationChan configMsg := <-configurationChan
log.Infof("Configuration receveived %+v", configuration) log.Infof("Configuration receveived from provider %v: %+v", configMsg.providerName, configMsg.configuration)
if configuration == nil { if configMsg.configuration == nil {
log.Info("Skipping empty configuration") log.Info("Skipping empty configuration")
} else if reflect.DeepEqual(currentConfiguration, configuration) { } else if reflect.DeepEqual(currentConfigurations[configMsg.providerName], configMsg.configuration) {
log.Info("Skipping same configuration") log.Info("Skipping same configuration")
} else { } else {
newConfigurationRouter, err := LoadConfig(configuration, globalConfiguration) // Copy configurations to new map so we don't change current if LoadConfig fails
newConfigurations := make(configs)
for k, v := range currentConfigurations {
newConfigurations[k] = v
}
newConfigurations[configMsg.providerName] = configMsg.configuration
newConfigurationRouter, err := LoadConfig(newConfigurations, globalConfiguration)
if err == nil { if err == nil {
currentConfiguration = configuration currentConfigurations = newConfigurations
configurationRouter = newConfigurationRouter configurationRouter = newConfigurationRouter
oldServer := srv oldServer := srv
newsrv := prepareServer(configurationRouter, globalConfiguration, oldServer, loggerMiddleware, metrics) newsrv := prepareServer(configurationRouter, globalConfiguration, oldServer, loggerMiddleware, metrics)
@ -210,69 +225,71 @@ func prepareServer(router *mux.Router, globalConfiguration *GlobalConfiguration,
} }
} }
func LoadConfig(configuration *Configuration, globalConfiguration *GlobalConfiguration) (*mux.Router, error) { func LoadConfig(configurations configs, globalConfiguration *GlobalConfiguration) (*mux.Router, error) {
router := mux.NewRouter() router := mux.NewRouter()
router.NotFoundHandler = http.HandlerFunc(notFoundHandler) router.NotFoundHandler = http.HandlerFunc(notFoundHandler)
backends := map[string]http.Handler{} backends := map[string]http.Handler{}
for frontendName, frontend := range configuration.Frontends { for _, configuration := range configurations {
log.Debugf("Creating frontend %s", frontendName) for frontendName, frontend := range configuration.Frontends {
fwd, _ := forward.New(forward.Logger(oxyLogger)) log.Debugf("Creating frontend %s", frontendName)
newRoute := router.NewRoute().Name(frontendName) fwd, _ := forward.New(forward.Logger(oxyLogger))
for routeName, route := range frontend.Routes { newRoute := router.NewRoute().Name(frontendName)
log.Debugf("Creating route %s %s:%s", routeName, route.Rule, route.Value) for routeName, route := range frontend.Routes {
newRouteReflect := Invoke(newRoute, route.Rule, route.Value) log.Debugf("Creating route %s %s:%s", routeName, route.Rule, route.Value)
newRoute = newRouteReflect[0].Interface().(*mux.Route) newRouteReflect := Invoke(newRoute, route.Rule, route.Value)
} newRoute = newRouteReflect[0].Interface().(*mux.Route)
if backends[frontend.Backend] == nil {
log.Debugf("Creating backend %s", frontend.Backend)
var lb http.Handler
rr, _ := roundrobin.New(fwd)
lbMethod, err := NewLoadBalancerMethod(configuration.Backends[frontend.Backend].LoadBalancer)
if err != nil {
configuration.Backends[frontend.Backend].LoadBalancer = &LoadBalancer{Method: "wrr"}
} }
switch lbMethod { if backends[frontend.Backend] == nil {
case drr: log.Debugf("Creating backend %s", frontend.Backend)
log.Debugf("Creating load-balancer drr") var lb http.Handler
rebalancer, _ := roundrobin.NewRebalancer(rr, roundrobin.RebalancerLogger(oxyLogger)) rr, _ := roundrobin.New(fwd)
lb = rebalancer lbMethod, err := NewLoadBalancerMethod(configuration.Backends[frontend.Backend].LoadBalancer)
for serverName, server := range configuration.Backends[frontend.Backend].Servers { if err != nil {
url, err := url.Parse(server.URL) configuration.Backends[frontend.Backend].LoadBalancer = &LoadBalancer{Method: "wrr"}
if err != nil {
return nil, err
}
log.Debugf("Creating server %s %s", serverName, url.String())
rebalancer.UpsertServer(url, roundrobin.Weight(server.Weight))
} }
case wrr: switch lbMethod {
log.Debugf("Creating load-balancer wrr") case drr:
lb = rr log.Debugf("Creating load-balancer drr")
for serverName, server := range configuration.Backends[frontend.Backend].Servers { rebalancer, _ := roundrobin.NewRebalancer(rr, roundrobin.RebalancerLogger(oxyLogger))
url, err := url.Parse(server.URL) lb = rebalancer
if err != nil { for serverName, server := range configuration.Backends[frontend.Backend].Servers {
return nil, err url, err := url.Parse(server.URL)
if err != nil {
return nil, err
}
log.Debugf("Creating server %s %s", serverName, url.String())
rebalancer.UpsertServer(url, roundrobin.Weight(server.Weight))
}
case wrr:
log.Debugf("Creating load-balancer wrr")
lb = rr
for serverName, server := range configuration.Backends[frontend.Backend].Servers {
url, err := url.Parse(server.URL)
if err != nil {
return nil, err
}
log.Debugf("Creating server %s %s", serverName, url.String())
rr.UpsertServer(url, roundrobin.Weight(server.Weight))
} }
log.Debugf("Creating server %s %s", serverName, url.String())
rr.UpsertServer(url, roundrobin.Weight(server.Weight))
} }
} var negroni = negroni.New()
var negroni = negroni.New() if configuration.Backends[frontend.Backend].CircuitBreaker != nil {
if configuration.Backends[frontend.Backend].CircuitBreaker != nil { log.Debugf("Creating circuit breaker %s", configuration.Backends[frontend.Backend].CircuitBreaker.Expression)
log.Debugf("Creating circuit breaker %s", configuration.Backends[frontend.Backend].CircuitBreaker.Expression) negroni.Use(middlewares.NewCircuitBreaker(lb, configuration.Backends[frontend.Backend].CircuitBreaker.Expression, cbreaker.Logger(oxyLogger)))
negroni.Use(middlewares.NewCircuitBreaker(lb, configuration.Backends[frontend.Backend].CircuitBreaker.Expression, cbreaker.Logger(oxyLogger))) } else {
negroni.UseHandler(lb)
}
backends[frontend.Backend] = negroni
} else { } else {
negroni.UseHandler(lb) log.Debugf("Reusing backend %s", frontend.Backend)
} }
backends[frontend.Backend] = negroni // stream.New(backends[frontend.Backend], stream.Retry("IsNetworkError() && Attempts() <= " + strconv.Itoa(globalConfiguration.Replay)), stream.Logger(oxyLogger))
} else {
log.Debugf("Reusing backend %s", frontend.Backend)
}
// stream.New(backends[frontend.Backend], stream.Retry("IsNetworkError() && Attempts() <= " + strconv.Itoa(globalConfiguration.Replay)), stream.Logger(oxyLogger))
newRoute.Handler(backends[frontend.Backend]) newRoute.Handler(backends[frontend.Backend])
err := newRoute.GetError() err := newRoute.GetError()
if err != nil { if err != nil {
log.Error("Error building route: %s", err) log.Error("Error building route: %s", err)
}
} }
} }
return router, nil return router, nil

39
web.go
View file

@ -11,6 +11,10 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
var (
webConfiguration *Configuration
)
type WebProvider struct { type WebProvider struct {
Address string Address string
CertFile, KeyFile string CertFile, KeyFile string
@ -20,7 +24,7 @@ type Page struct {
Configuration Configuration Configuration Configuration
} }
func (provider *WebProvider) Provide(configurationChan chan<- *Configuration) { func (provider *WebProvider) Provide(configurationChan chan<- configMessage) {
systemRouter := mux.NewRouter() systemRouter := mux.NewRouter()
systemRouter.Methods("GET").Path("/").Handler(http.HandlerFunc(GetHTMLConfigHandler)) systemRouter.Methods("GET").Path("/").Handler(http.HandlerFunc(GetHTMLConfigHandler))
systemRouter.Methods("GET").Path("/health").Handler(http.HandlerFunc(GetHealthHandler)) systemRouter.Methods("GET").Path("/health").Handler(http.HandlerFunc(GetHealthHandler))
@ -31,7 +35,7 @@ func (provider *WebProvider) Provide(configurationChan chan<- *Configuration) {
b, _ := ioutil.ReadAll(r.Body) b, _ := ioutil.ReadAll(r.Body)
err := json.Unmarshal(b, configuration) err := json.Unmarshal(b, configuration)
if err == nil { if err == nil {
configurationChan <- configuration configurationChan <- configMessage{"web", configuration}
GetConfigHandler(rw, r) GetConfigHandler(rw, r)
} else { } else {
log.Errorf("Error parsing configuration %+v", err) log.Errorf("Error parsing configuration %+v", err)
@ -62,11 +66,26 @@ func (provider *WebProvider) Provide(configurationChan chan<- *Configuration) {
} }
func GetConfigHandler(rw http.ResponseWriter, r *http.Request) { func GetConfigHandler(rw http.ResponseWriter, r *http.Request) {
templatesRenderer.JSON(rw, http.StatusOK, currentConfiguration) templatesRenderer.JSON(rw, http.StatusOK, webConfiguration)
} }
func GetHTMLConfigHandler(response http.ResponseWriter, request *http.Request) { func GetHTMLConfigHandler(response http.ResponseWriter, request *http.Request) {
templatesRenderer.HTML(response, http.StatusOK, "configuration", Page{Configuration: *currentConfiguration}) var cfg Configuration
cfg.Backends = make(map[string]*Backend)
cfg.Frontends = make(map[string]*Frontend)
// Quick and dirty merge of config for display
for _, config := range currentConfigurations {
for name, config := range config.Backends {
cfg.Backends[name] = config
}
for name, config := range config.Frontends {
cfg.Frontends[name] = config
}
}
templatesRenderer.HTML(response, http.StatusOK, "configuration", Page{Configuration: cfg})
} }
func GetHealthHandler(rw http.ResponseWriter, r *http.Request) { func GetHealthHandler(rw http.ResponseWriter, r *http.Request) {
@ -74,13 +93,13 @@ func GetHealthHandler(rw http.ResponseWriter, r *http.Request) {
} }
func GetBackendsHandler(rw http.ResponseWriter, r *http.Request) { func GetBackendsHandler(rw http.ResponseWriter, r *http.Request) {
templatesRenderer.JSON(rw, http.StatusOK, currentConfiguration.Backends) templatesRenderer.JSON(rw, http.StatusOK, webConfiguration.Backends)
} }
func GetBackendHandler(rw http.ResponseWriter, r *http.Request) { func GetBackendHandler(rw http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
id := vars["backend"] id := vars["backend"]
if backend, ok := currentConfiguration.Backends[id]; ok { if backend, ok := webConfiguration.Backends[id]; ok {
templatesRenderer.JSON(rw, http.StatusOK, backend) templatesRenderer.JSON(rw, http.StatusOK, backend)
} else { } else {
http.NotFound(rw, r) http.NotFound(rw, r)
@ -88,13 +107,13 @@ func GetBackendHandler(rw http.ResponseWriter, r *http.Request) {
} }
func GetFrontendsHandler(rw http.ResponseWriter, r *http.Request) { func GetFrontendsHandler(rw http.ResponseWriter, r *http.Request) {
templatesRenderer.JSON(rw, http.StatusOK, currentConfiguration.Frontends) templatesRenderer.JSON(rw, http.StatusOK, webConfiguration.Frontends)
} }
func GetFrontendHandler(rw http.ResponseWriter, r *http.Request) { func GetFrontendHandler(rw http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
id := vars["frontend"] id := vars["frontend"]
if frontend, ok := currentConfiguration.Frontends[id]; ok { if frontend, ok := webConfiguration.Frontends[id]; ok {
templatesRenderer.JSON(rw, http.StatusOK, frontend) templatesRenderer.JSON(rw, http.StatusOK, frontend)
} else { } else {
http.NotFound(rw, r) http.NotFound(rw, r)
@ -104,7 +123,7 @@ func GetFrontendHandler(rw http.ResponseWriter, r *http.Request) {
func GetServersHandler(rw http.ResponseWriter, r *http.Request) { func GetServersHandler(rw http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
backend := vars["backend"] backend := vars["backend"]
if backend, ok := currentConfiguration.Backends[backend]; ok { if backend, ok := webConfiguration.Backends[backend]; ok {
templatesRenderer.JSON(rw, http.StatusOK, backend.Servers) templatesRenderer.JSON(rw, http.StatusOK, backend.Servers)
} else { } else {
http.NotFound(rw, r) http.NotFound(rw, r)
@ -115,7 +134,7 @@ func GetServerHandler(rw http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
backend := vars["backend"] backend := vars["backend"]
server := vars["server"] server := vars["server"]
if backend, ok := currentConfiguration.Backends[backend]; ok { if backend, ok := webConfiguration.Backends[backend]; ok {
if server, ok := backend.Servers[server]; ok { if server, ok := backend.Servers[server]; ok {
templatesRenderer.JSON(rw, http.StatusOK, server) templatesRenderer.JSON(rw, http.StatusOK, server)
} else { } else {