From a0b15a0efdc8be79c41ce9bee38d8571835da2a1 Mon Sep 17 00:00:00 2001 From: emile Date: Wed, 13 Jan 2016 22:45:49 +0100 Subject: [PATCH] Main file refactoring, extract a Server object --- adapters.go | 2 +- glide.yaml | 4 + server.go | 369 ++++++++++++++++++++++++++++++++++++++++++++++++++++ traefik.go | 325 +-------------------------------------------- web.go | 69 +++++----- 5 files changed, 416 insertions(+), 353 deletions(-) create mode 100644 server.go diff --git a/adapters.go b/adapters.go index bebe4e7d1..dcd7e5f84 100644 --- a/adapters.go +++ b/adapters.go @@ -35,7 +35,7 @@ func notFoundHandler(w http.ResponseWriter, r *http.Request) { } // LoadDefaultConfig returns a default gorrilla.mux router from the specified configuration. -func LoadDefaultConfig(globalConfiguration *GlobalConfiguration) *mux.Router { +func LoadDefaultConfig(globalConfiguration GlobalConfiguration) *mux.Router { router := mux.NewRouter() router.NotFoundHandler = http.HandlerFunc(notFoundHandler) return router diff --git a/glide.yaml b/glide.yaml index fd31cc94f..65400739f 100644 --- a/glide.yaml +++ b/glide.yaml @@ -146,4 +146,8 @@ import: ref: d8a3071799b98cacd30b6da92f536050ccfe6da4 - package: github.com/golang/glog ref: fca8c8854093a154ff1eb580aae10276ad6b1b5f + - package: github.com/spf13/cast + version: ee7b3e0353166ab1f3a605294ac8cd2b77953778 + - package: github.com/spf13/viper + version: a212099cbe6fbe8d07476bfda8d2d39b6ff8f325 diff --git a/server.go b/server.go new file mode 100644 index 000000000..d273d3d43 --- /dev/null +++ b/server.go @@ -0,0 +1,369 @@ +/* +Copyright +*/ +package main + +import ( + "crypto/tls" + "errors" + log "github.com/Sirupsen/logrus" + "github.com/codegangsta/negroni" + "github.com/emilevauge/traefik/middlewares" + "github.com/emilevauge/traefik/provider" + "github.com/emilevauge/traefik/types" + "github.com/gorilla/mux" + "github.com/mailgun/manners" + "github.com/mailgun/oxy/cbreaker" + "github.com/mailgun/oxy/forward" + "github.com/mailgun/oxy/roundrobin" + "net/http" + "net/url" + "os" + "os/signal" + "reflect" + "sync" + "syscall" + "time" +) + +// Server is the reverse-proxy/load-balancer engine +type Server struct { + srv *manners.GracefulServer + configurationRouter *mux.Router + configurationChan chan types.ConfigMessage + configurationChanValidated chan types.ConfigMessage + sigs chan os.Signal + stopChan chan bool + providers []provider.Provider + serverLock sync.Mutex + currentConfigurations configs + globalConfiguration GlobalConfiguration + loggerMiddleware *middlewares.Logger +} + +// NewServer returns an initialized Server. +func NewServer(globalConfiguration GlobalConfiguration) *Server { + server := new(Server) + + server.configurationChan = make(chan types.ConfigMessage, 10) + server.configurationChanValidated = make(chan types.ConfigMessage, 10) + server.sigs = make(chan os.Signal, 1) + server.stopChan = make(chan bool) + server.providers = []provider.Provider{} + signal.Notify(server.sigs, syscall.SIGINT, syscall.SIGTERM) + server.currentConfigurations = make(configs) + server.globalConfiguration = globalConfiguration + server.loggerMiddleware = middlewares.NewLogger(globalConfiguration.AccessLogsFile) + + return server +} + +// Start starts the server and blocks until server is shutted down. +func (server *Server) Start() { + server.configurationRouter = LoadDefaultConfig(server.globalConfiguration) + go server.listenProviders() + go server.enableRouter() + server.configureProviders() + server.startProviders() + go server.listenSignals() + + var er error + server.serverLock.Lock() + server.srv, er = server.prepareServer(server.configurationRouter, server.globalConfiguration, nil, server.loggerMiddleware, metrics) + if er != nil { + log.Fatal("Error preparing server: ", er) + } + go server.startServer(server.srv, server.globalConfiguration) + //TODO change that! + time.Sleep(100 * time.Millisecond) + server.serverLock.Unlock() + + <-server.stopChan +} + +// Stop stops the server +func (server *Server) Stop() { + server.srv.Close() + server.stopChan <- true +} + +// Close destroys the server +func (server *Server) Close() { + defer close(server.configurationChan) + defer close(server.configurationChanValidated) + defer close(server.sigs) + defer close(server.stopChan) + defer server.loggerMiddleware.Close() +} + +func (server *Server) listenProviders() { + lastReceivedConfiguration := time.Unix(0, 0) + lastConfigs := make(map[string]*types.ConfigMessage) + for { + configMsg := <-server.configurationChan + log.Infof("Configuration receveived from provider %s: %#v", configMsg.ProviderName, configMsg.Configuration) + lastConfigs[configMsg.ProviderName] = &configMsg + if time.Now().After(lastReceivedConfiguration.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) { + log.Infof("Last %s config received more than %s, OK", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration) + // last config received more than n s ago + server.configurationChanValidated <- configMsg + } else { + log.Infof("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration) + go func() { + <-time.After(server.globalConfiguration.ProvidersThrottleDuration) + if time.Now().After(lastReceivedConfiguration.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) { + log.Infof("Waited for %s config, OK", configMsg.ProviderName) + server.configurationChanValidated <- *lastConfigs[configMsg.ProviderName] + } + }() + } + lastReceivedConfiguration = time.Now() + } +} + +func (server *Server) enableRouter() { + for { + configMsg := <-server.configurationChanValidated + if configMsg.Configuration == nil { + log.Info("Skipping empty Configuration") + } else if reflect.DeepEqual(server.currentConfigurations[configMsg.ProviderName], configMsg.Configuration) { + log.Info("Skipping same configuration") + } else { + // Copy configurations to new map so we don't change current if LoadConfig fails + newConfigurations := make(configs) + for k, v := range server.currentConfigurations { + newConfigurations[k] = v + } + newConfigurations[configMsg.ProviderName] = configMsg.Configuration + + newConfigurationRouter, err := server.loadConfig(newConfigurations, server.globalConfiguration) + if err == nil { + server.serverLock.Lock() + server.currentConfigurations = newConfigurations + server.configurationRouter = newConfigurationRouter + oldServer := server.srv + newsrv, err := server.prepareServer(server.configurationRouter, server.globalConfiguration, oldServer, server.loggerMiddleware, metrics) + if err != nil { + log.Fatal("Error preparing server: ", err) + } + go server.startServer(newsrv, server.globalConfiguration) + server.srv = newsrv + time.Sleep(1 * time.Second) + if oldServer != nil { + log.Info("Stopping old server") + oldServer.Close() + } + server.serverLock.Unlock() + } else { + log.Error("Error loading new configuration, aborted ", err) + } + } + } +} + +func (server *Server) configureProviders() { + // configure providers + if server.globalConfiguration.Docker != nil { + server.providers = append(server.providers, server.globalConfiguration.Docker) + } + if server.globalConfiguration.Marathon != nil { + server.providers = append(server.providers, server.globalConfiguration.Marathon) + } + if server.globalConfiguration.File != nil { + if len(server.globalConfiguration.File.Filename) == 0 { + // no filename, setting to global config file + server.globalConfiguration.File.Filename = *globalConfigFile + } + server.providers = append(server.providers, server.globalConfiguration.File) + } + if server.globalConfiguration.Web != nil { + server.globalConfiguration.Web.server = server + server.providers = append(server.providers, server.globalConfiguration.Web) + } + if server.globalConfiguration.Consul != nil { + server.providers = append(server.providers, server.globalConfiguration.Consul) + } + if server.globalConfiguration.Etcd != nil { + server.providers = append(server.providers, server.globalConfiguration.Etcd) + } + if server.globalConfiguration.Zookeeper != nil { + server.providers = append(server.providers, server.globalConfiguration.Zookeeper) + } + if server.globalConfiguration.Boltdb != nil { + server.providers = append(server.providers, server.globalConfiguration.Boltdb) + } +} + +func (server *Server) startProviders() { + // start providers + for _, provider := range server.providers { + log.Infof("Starting provider %v %+v", reflect.TypeOf(provider), provider) + currentProvider := provider + go func() { + err := currentProvider.Provide(server.configurationChan) + if err != nil { + log.Errorf("Error starting provider %s", err) + } + }() + } +} + +func (server *Server) listenSignals() { + sig := <-server.sigs + log.Infof("I have to go... %+v", sig) + log.Info("Stopping server") + server.Stop() +} + +// creates a TLS config that allows terminating HTTPS for multiple domains using SNI +func (server *Server) createTLSConfig(certs []Certificate) (*tls.Config, error) { + if len(certs) == 0 { + return nil, nil + } + + config := &tls.Config{} + if config.NextProtos == nil { + config.NextProtos = []string{"http/1.1"} + } + + var err error + config.Certificates = make([]tls.Certificate, len(certs)) + for i, v := range certs { + config.Certificates[i], err = tls.LoadX509KeyPair(v.CertFile, v.KeyFile) + if err != nil { + return nil, err + } + } + // BuildNameToCertificate parses the CommonName and SubjectAlternateName fields + // in each certificate and populates the config.NameToCertificate map. + config.BuildNameToCertificate() + return config, nil +} + +func (server *Server) startServer(srv *manners.GracefulServer, globalConfiguration GlobalConfiguration) { + log.Info("Starting server") + if srv.TLSConfig != nil { + err := srv.ListenAndServeTLSWithConfig(srv.TLSConfig) + if err != nil { + log.Fatal("Error creating server: ", err) + } + } else { + err := srv.ListenAndServe() + if err != nil { + log.Fatal("Error creating server: ", err) + } + } + log.Info("Server stopped") +} + +func (server *Server) prepareServer(router *mux.Router, globalConfiguration GlobalConfiguration, oldServer *manners.GracefulServer, middlewares ...negroni.Handler) (*manners.GracefulServer, error) { + log.Info("Preparing server") + // middlewares + var negroni = negroni.New() + for _, middleware := range middlewares { + negroni.Use(middleware) + } + negroni.UseHandler(router) + tlsConfig, err := server.createTLSConfig(globalConfiguration.Certificates) + if err != nil { + log.Fatalf("Error creating TLS config %s", err) + return nil, err + } + + if oldServer == nil { + return manners.NewWithServer( + &http.Server{ + Addr: globalConfiguration.Port, + Handler: negroni, + TLSConfig: tlsConfig, + }), nil + } + gracefulServer, err := oldServer.HijackListener(&http.Server{ + Addr: globalConfiguration.Port, + Handler: negroni, + TLSConfig: tlsConfig, + }, tlsConfig) + if err != nil { + log.Fatalf("Error hijacking server %s", err) + return nil, err + } + return gracefulServer, nil +} + +// LoadConfig returns a new gorilla.mux Route from the specified global configuration and the dynamic +// provider configurations. +func (server *Server) loadConfig(configurations configs, globalConfiguration GlobalConfiguration) (*mux.Router, error) { + router := mux.NewRouter() + router.NotFoundHandler = http.HandlerFunc(notFoundHandler) + backends := map[string]http.Handler{} + for _, configuration := range configurations { + for frontendName, frontend := range configuration.Frontends { + log.Debugf("Creating frontend %s", frontendName) + fwd, _ := forward.New(forward.Logger(oxyLogger), forward.PassHostHeader(frontend.PassHostHeader)) + newRoute := router.NewRoute().Name(frontendName) + for routeName, route := range frontend.Routes { + log.Debugf("Creating route %s %s:%s", routeName, route.Rule, route.Value) + newRouteReflect, err := invoke(newRoute, route.Rule, route.Value) + if err != nil { + return nil, err + } + newRoute = newRouteReflect[0].Interface().(*mux.Route) + } + if backends[frontend.Backend] == nil { + log.Debugf("Creating backend %s", frontend.Backend) + var lb http.Handler + rr, _ := roundrobin.New(fwd) + if configuration.Backends[frontend.Backend] == nil { + return nil, errors.New("Backend not found: " + frontend.Backend) + } + lbMethod, err := types.NewLoadBalancerMethod(configuration.Backends[frontend.Backend].LoadBalancer) + if err != nil { + configuration.Backends[frontend.Backend].LoadBalancer = &types.LoadBalancer{Method: "wrr"} + } + switch lbMethod { + case types.Drr: + log.Infof("Creating load-balancer drr") + rebalancer, _ := roundrobin.NewRebalancer(rr, roundrobin.RebalancerLogger(oxyLogger)) + lb = rebalancer + for serverName, server := range configuration.Backends[frontend.Backend].Servers { + url, err := url.Parse(server.URL) + if err != nil { + return nil, err + } + log.Infof("Creating server %s %s", serverName, url.String()) + rebalancer.UpsertServer(url, roundrobin.Weight(server.Weight)) + } + case types.Wrr: + log.Infof("Creating load-balancer wrr") + lb = middlewares.NewWebsocketUpgrader(rr) + for serverName, server := range configuration.Backends[frontend.Backend].Servers { + url, err := url.Parse(server.URL) + if err != nil { + return nil, err + } + log.Infof("Creating server %s %s", serverName, url.String()) + rr.UpsertServer(url, roundrobin.Weight(server.Weight)) + } + } + var negroni = negroni.New() + if configuration.Backends[frontend.Backend].CircuitBreaker != nil { + log.Infof("Creating circuit breaker %s", configuration.Backends[frontend.Backend].CircuitBreaker.Expression) + negroni.Use(middlewares.NewCircuitBreaker(lb, configuration.Backends[frontend.Backend].CircuitBreaker.Expression, cbreaker.Logger(oxyLogger))) + } else { + negroni.UseHandler(lb) + } + backends[frontend.Backend] = negroni + } else { + log.Infof("Reusing backend %s", frontend.Backend) + } + // stream.New(backends[frontend.Backend], stream.Retry("IsNetworkError() && Attempts() <= " + strconv.Itoa(globalConfiguration.Replay)), stream.Logger(oxyLogger)) + + newRoute.Handler(backends[frontend.Backend]) + err := newRoute.GetError() + if err != nil { + log.Errorf("Error building route: %s", err) + } + } + } + return router, nil +} diff --git a/traefik.go b/traefik.go index c84d99ba4..562fd3a62 100644 --- a/traefik.go +++ b/traefik.go @@ -1,40 +1,24 @@ package main import ( - "crypto/tls" "errors" fmtlog "log" - "net/http" - "net/url" "os" - "os/signal" "reflect" "runtime" "strings" - "syscall" - "time" log "github.com/Sirupsen/logrus" - "github.com/codegangsta/negroni" "github.com/emilevauge/traefik/middlewares" - "github.com/emilevauge/traefik/provider" - "github.com/emilevauge/traefik/types" - "github.com/gorilla/mux" - "github.com/mailgun/manners" - "github.com/mailgun/oxy/cbreaker" - "github.com/mailgun/oxy/forward" - "github.com/mailgun/oxy/roundrobin" "github.com/thoas/stats" "gopkg.in/alecthomas/kingpin.v2" - "sync" ) var ( - globalConfigFile = kingpin.Arg("conf", "Main configration file.").Default("traefik.toml").String() - version = kingpin.Flag("version", "Get Version.").Short('v').Bool() - currentConfigurations = make(configs) - metrics = stats.New() - oxyLogger = &OxyLogger{} + globalConfigFile = kingpin.Arg("conf", "Main configration file.").Default("traefik.toml").String() + version = kingpin.Flag("version", "Get Version.").Short('v').Bool() + metrics = stats.New() + oxyLogger = &OxyLogger{} ) func main() { @@ -42,19 +26,6 @@ func main() { kingpin.Version(Version + " built on the " + BuildDate) kingpin.Parse() fmtlog.SetFlags(fmtlog.Lshortfile | fmtlog.LstdFlags) - var srv *manners.GracefulServer - var configurationRouter *mux.Router - var configurationChan = make(chan types.ConfigMessage, 10) - defer close(configurationChan) - var configurationChanValidated = make(chan types.ConfigMessage, 10) - defer close(configurationChanValidated) - var sigs = make(chan os.Signal, 1) - defer close(sigs) - var stopChan = make(chan bool) - defer close(stopChan) - var providers = []provider.Provider{} - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - var serverLock sync.Mutex // load global configuration globalConfiguration := LoadFileConfig(*globalConfigFile) @@ -82,294 +53,12 @@ func main() { log.SetFormatter(&log.TextFormatter{FullTimestamp: true, DisableSorting: true}) } log.Debugf("Global configuration loaded %+v", globalConfiguration) - configurationRouter = LoadDefaultConfig(globalConfiguration) - - // listen new configurations from providers - go func() { - lastReceivedConfiguration := time.Unix(0, 0) - lastConfigs := make(map[string]*types.ConfigMessage) - for { - configMsg := <-configurationChan - log.Infof("Configuration receveived from provider %s: %#v", configMsg.ProviderName, configMsg.Configuration) - lastConfigs[configMsg.ProviderName] = &configMsg - if time.Now().After(lastReceivedConfiguration.Add(time.Duration(globalConfiguration.ProvidersThrottleDuration))) { - log.Infof("Last %s config received more than %s, OK", configMsg.ProviderName, globalConfiguration.ProvidersThrottleDuration) - // last config received more than n s ago - configurationChanValidated <- configMsg - } else { - log.Infof("Last %s config received less than %s, waiting...", configMsg.ProviderName, globalConfiguration.ProvidersThrottleDuration) - go func() { - <-time.After(globalConfiguration.ProvidersThrottleDuration) - if time.Now().After(lastReceivedConfiguration.Add(time.Duration(globalConfiguration.ProvidersThrottleDuration))) { - log.Infof("Waited for %s config, OK", configMsg.ProviderName) - configurationChanValidated <- *lastConfigs[configMsg.ProviderName] - } - }() - } - lastReceivedConfiguration = time.Now() - } - }() - go func() { - for { - configMsg := <-configurationChanValidated - if configMsg.Configuration == nil { - log.Info("Skipping empty Configuration") - } else if reflect.DeepEqual(currentConfigurations[configMsg.ProviderName], configMsg.Configuration) { - log.Info("Skipping same configuration") - } else { - // Copy configurations to new map so we don't change current if LoadConfig fails - newConfigurations := make(configs) - for k, v := range currentConfigurations { - newConfigurations[k] = v - } - newConfigurations[configMsg.ProviderName] = configMsg.Configuration - - newConfigurationRouter, err := LoadConfig(newConfigurations, globalConfiguration) - if err == nil { - serverLock.Lock() - currentConfigurations = newConfigurations - configurationRouter = newConfigurationRouter - oldServer := srv - newsrv, err := prepareServer(configurationRouter, globalConfiguration, oldServer, loggerMiddleware, metrics) - if err != nil { - log.Fatal("Error preparing server: ", err) - } - go startServer(newsrv, globalConfiguration) - srv = newsrv - time.Sleep(1 * time.Second) - if oldServer != nil { - log.Info("Stopping old server") - oldServer.Close() - } - serverLock.Unlock() - } else { - log.Error("Error loading new configuration, aborted ", err) - } - } - } - }() - - // configure providers - if globalConfiguration.Docker != nil { - providers = append(providers, globalConfiguration.Docker) - } - if globalConfiguration.Marathon != nil { - providers = append(providers, globalConfiguration.Marathon) - } - if globalConfiguration.File != nil { - if len(globalConfiguration.File.Filename) == 0 { - // no filename, setting to global config file - globalConfiguration.File.Filename = *globalConfigFile - } - providers = append(providers, globalConfiguration.File) - } - if globalConfiguration.Web != nil { - providers = append(providers, globalConfiguration.Web) - } - if globalConfiguration.Consul != nil { - providers = append(providers, globalConfiguration.Consul) - } - if globalConfiguration.Etcd != nil { - providers = append(providers, globalConfiguration.Etcd) - } - if globalConfiguration.Zookeeper != nil { - providers = append(providers, globalConfiguration.Zookeeper) - } - if globalConfiguration.Boltdb != nil { - providers = append(providers, globalConfiguration.Boltdb) - } - - // start providers - for _, provider := range providers { - log.Infof("Starting provider %v %+v", reflect.TypeOf(provider), provider) - currentProvider := provider - go func() { - err := currentProvider.Provide(configurationChan) - if err != nil { - log.Errorf("Error starting provider %s", err) - } - }() - } - - go func() { - sig := <-sigs - log.Infof("I have to go... %+v", sig) - log.Info("Stopping server") - srv.Close() - stopChan <- true - }() - - //negroni.Use(middlewares.NewCircuitBreaker(oxyLogger)) - //negroni.Use(middlewares.NewRoutes(configurationRouter)) - - var er error - serverLock.Lock() - srv, er = prepareServer(configurationRouter, globalConfiguration, nil, loggerMiddleware, metrics) - if er != nil { - log.Fatal("Error preparing server: ", er) - } - go startServer(srv, globalConfiguration) - //TODO change that! - time.Sleep(100 * time.Millisecond) - serverLock.Unlock() - - <-stopChan + server := NewServer(*globalConfiguration) + server.Start() + server.Close() log.Info("Shutting down") } -// creates a TLS config that allows terminating HTTPS for multiple domains using SNI -func createTLSConfig(certs []Certificate) (*tls.Config, error) { - if len(certs) == 0 { - return nil, nil - } - - config := &tls.Config{} - if config.NextProtos == nil { - config.NextProtos = []string{"http/1.1"} - } - - var err error - config.Certificates = make([]tls.Certificate, len(certs)) - for i, v := range certs { - config.Certificates[i], err = tls.LoadX509KeyPair(v.CertFile, v.KeyFile) - if err != nil { - return nil, err - } - } - // BuildNameToCertificate parses the CommonName and SubjectAlternateName fields - // in each certificate and populates the config.NameToCertificate map. - config.BuildNameToCertificate() - return config, nil -} - -func startServer(srv *manners.GracefulServer, globalConfiguration *GlobalConfiguration) { - log.Info("Starting server") - if srv.TLSConfig != nil { - err := srv.ListenAndServeTLSWithConfig(srv.TLSConfig) - if err != nil { - log.Fatal("Error creating server: ", err) - } - } else { - err := srv.ListenAndServe() - if err != nil { - log.Fatal("Error creating server: ", err) - } - } - log.Info("Server stopped") -} - -func prepareServer(router *mux.Router, globalConfiguration *GlobalConfiguration, oldServer *manners.GracefulServer, middlewares ...negroni.Handler) (*manners.GracefulServer, error) { - log.Info("Preparing server") - // middlewares - var negroni = negroni.New() - for _, middleware := range middlewares { - negroni.Use(middleware) - } - negroni.UseHandler(router) - tlsConfig, err := createTLSConfig(globalConfiguration.Certificates) - if err != nil { - log.Fatalf("Error creating TLS config %s", err) - return nil, err - } - - if oldServer == nil { - return manners.NewWithServer( - &http.Server{ - Addr: globalConfiguration.Port, - Handler: negroni, - TLSConfig: tlsConfig, - }), nil - } - server, err := oldServer.HijackListener(&http.Server{ - Addr: globalConfiguration.Port, - Handler: negroni, - TLSConfig: tlsConfig, - }, tlsConfig) - if err != nil { - log.Fatalf("Error hijacking server %s", err) - return nil, err - } - return server, nil -} - -// LoadConfig returns a new gorrilla.mux Route from the specified global configuration and the dynamic -// provider configurations. -func LoadConfig(configurations configs, globalConfiguration *GlobalConfiguration) (*mux.Router, error) { - router := mux.NewRouter() - router.NotFoundHandler = http.HandlerFunc(notFoundHandler) - backends := map[string]http.Handler{} - for _, configuration := range configurations { - for frontendName, frontend := range configuration.Frontends { - log.Debugf("Creating frontend %s", frontendName) - fwd, _ := forward.New(forward.Logger(oxyLogger), forward.PassHostHeader(frontend.PassHostHeader)) - newRoute := router.NewRoute().Name(frontendName) - for routeName, route := range frontend.Routes { - log.Debugf("Creating route %s %s:%s", routeName, route.Rule, route.Value) - newRouteReflect, err := invoke(newRoute, route.Rule, route.Value) - if err != nil { - return nil, err - } - newRoute = newRouteReflect[0].Interface().(*mux.Route) - } - if backends[frontend.Backend] == nil { - log.Debugf("Creating backend %s", frontend.Backend) - var lb http.Handler - rr, _ := roundrobin.New(fwd) - if configuration.Backends[frontend.Backend] == nil { - return nil, errors.New("Backend not found: " + frontend.Backend) - } - lbMethod, err := types.NewLoadBalancerMethod(configuration.Backends[frontend.Backend].LoadBalancer) - if err != nil { - configuration.Backends[frontend.Backend].LoadBalancer = &types.LoadBalancer{Method: "wrr"} - } - switch lbMethod { - case types.Drr: - log.Infof("Creating load-balancer drr") - rebalancer, _ := roundrobin.NewRebalancer(rr, roundrobin.RebalancerLogger(oxyLogger)) - lb = rebalancer - for serverName, server := range configuration.Backends[frontend.Backend].Servers { - url, err := url.Parse(server.URL) - if err != nil { - return nil, err - } - log.Infof("Creating server %s %s", serverName, url.String()) - rebalancer.UpsertServer(url, roundrobin.Weight(server.Weight)) - } - case types.Wrr: - log.Infof("Creating load-balancer wrr") - lb = middlewares.NewWebsocketUpgrader(rr) - for serverName, server := range configuration.Backends[frontend.Backend].Servers { - url, err := url.Parse(server.URL) - if err != nil { - return nil, err - } - log.Infof("Creating server %s %s", serverName, url.String()) - rr.UpsertServer(url, roundrobin.Weight(server.Weight)) - } - } - var negroni = negroni.New() - if configuration.Backends[frontend.Backend].CircuitBreaker != nil { - log.Infof("Creating circuit breaker %s", configuration.Backends[frontend.Backend].CircuitBreaker.Expression) - negroni.Use(middlewares.NewCircuitBreaker(lb, configuration.Backends[frontend.Backend].CircuitBreaker.Expression, cbreaker.Logger(oxyLogger))) - } else { - negroni.UseHandler(lb) - } - backends[frontend.Backend] = negroni - } else { - log.Infof("Reusing backend %s", frontend.Backend) - } - // stream.New(backends[frontend.Backend], stream.Retry("IsNetworkError() && Attempts() <= " + strconv.Itoa(globalConfiguration.Replay)), stream.Logger(oxyLogger)) - - newRoute.Handler(backends[frontend.Backend]) - err := newRoute.GetError() - if err != nil { - log.Errorf("Error building route: %s", err) - } - } - } - return router, nil -} - // Invoke calls the specified method with the specified arguments on the specified interface. // It uses the go(lang) reflect package. func invoke(any interface{}, name string, args ...interface{}) ([]reflect.Value, error) { diff --git a/web.go b/web.go index cf46392ab..3c4eb91a1 100644 --- a/web.go +++ b/web.go @@ -20,6 +20,7 @@ type WebProvider struct { Address string CertFile, KeyFile string ReadOnly bool + server *Server } var ( @@ -34,12 +35,12 @@ func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessag systemRouter := mux.NewRouter() // health route - systemRouter.Methods("GET").Path("/health").HandlerFunc(getHealthHandler) + systemRouter.Methods("GET").Path("/health").HandlerFunc(provider.getHealthHandler) // API routes - systemRouter.Methods("GET").Path("/api").HandlerFunc(getConfigHandler) - systemRouter.Methods("GET").Path("/api/providers").HandlerFunc(getConfigHandler) - systemRouter.Methods("GET").Path("/api/providers/{provider}").HandlerFunc(getProviderHandler) + systemRouter.Methods("GET").Path("/api").HandlerFunc(provider.getConfigHandler) + systemRouter.Methods("GET").Path("/api/providers").HandlerFunc(provider.getConfigHandler) + systemRouter.Methods("GET").Path("/api/providers/{provider}").HandlerFunc(provider.getProviderHandler) systemRouter.Methods("PUT").Path("/api/providers/{provider}").HandlerFunc(func(response http.ResponseWriter, request *http.Request) { if provider.ReadOnly { response.WriteHeader(http.StatusForbidden) @@ -58,20 +59,20 @@ func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessag err := json.Unmarshal(body, configuration) if err == nil { configurationChan <- types.ConfigMessage{"web", configuration} - getConfigHandler(response, request) + provider.getConfigHandler(response, request) } else { log.Errorf("Error parsing configuration %+v", err) http.Error(response, fmt.Sprintf("%+v", err), http.StatusBadRequest) } }) - systemRouter.Methods("GET").Path("/api/providers/{provider}/backends").HandlerFunc(getBackendsHandler) - systemRouter.Methods("GET").Path("/api/providers/{provider}/backends/{backend}").HandlerFunc(getBackendHandler) - systemRouter.Methods("GET").Path("/api/providers/{provider}/backends/{backend}/servers").HandlerFunc(getServersHandler) - systemRouter.Methods("GET").Path("/api/providers/{provider}/backends/{backend}/servers/{server}").HandlerFunc(getServerHandler) - systemRouter.Methods("GET").Path("/api/providers/{provider}/frontends").HandlerFunc(getFrontendsHandler) - systemRouter.Methods("GET").Path("/api/providers/{provider}/frontends/{frontend}").HandlerFunc(getFrontendHandler) - systemRouter.Methods("GET").Path("/api/providers/{provider}/frontends/{frontend}/routes").HandlerFunc(getRoutesHandler) - systemRouter.Methods("GET").Path("/api/providers/{provider}/frontends/{frontend}/routes/{route}").HandlerFunc(getRouteHandler) + systemRouter.Methods("GET").Path("/api/providers/{provider}/backends").HandlerFunc(provider.getBackendsHandler) + systemRouter.Methods("GET").Path("/api/providers/{provider}/backends/{backend}").HandlerFunc(provider.getBackendHandler) + systemRouter.Methods("GET").Path("/api/providers/{provider}/backends/{backend}/servers").HandlerFunc(provider.getServersHandler) + systemRouter.Methods("GET").Path("/api/providers/{provider}/backends/{backend}/servers/{server}").HandlerFunc(provider.getServerHandler) + systemRouter.Methods("GET").Path("/api/providers/{provider}/frontends").HandlerFunc(provider.getFrontendsHandler) + systemRouter.Methods("GET").Path("/api/providers/{provider}/frontends/{frontend}").HandlerFunc(provider.getFrontendHandler) + systemRouter.Methods("GET").Path("/api/providers/{provider}/frontends/{frontend}/routes").HandlerFunc(provider.getRoutesHandler) + systemRouter.Methods("GET").Path("/api/providers/{provider}/frontends/{frontend}/routes/{route}").HandlerFunc(provider.getRouteHandler) // Expose dashboard systemRouter.Methods("GET").Path("/").HandlerFunc(func(response http.ResponseWriter, request *http.Request) { @@ -95,39 +96,39 @@ func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessag return nil } -func getHealthHandler(response http.ResponseWriter, request *http.Request) { +func (provider *WebProvider) getHealthHandler(response http.ResponseWriter, request *http.Request) { templatesRenderer.JSON(response, http.StatusOK, metrics.Data()) } -func getConfigHandler(response http.ResponseWriter, request *http.Request) { - templatesRenderer.JSON(response, http.StatusOK, currentConfigurations) +func (provider *WebProvider) getConfigHandler(response http.ResponseWriter, request *http.Request) { + templatesRenderer.JSON(response, http.StatusOK, provider.server.currentConfigurations) } -func getProviderHandler(response http.ResponseWriter, request *http.Request) { +func (provider *WebProvider) getProviderHandler(response http.ResponseWriter, request *http.Request) { vars := mux.Vars(request) providerID := vars["provider"] - if provider, ok := currentConfigurations[providerID]; ok { + if provider, ok := provider.server.currentConfigurations[providerID]; ok { templatesRenderer.JSON(response, http.StatusOK, provider) } else { http.NotFound(response, request) } } -func getBackendsHandler(response http.ResponseWriter, request *http.Request) { +func (provider *WebProvider) getBackendsHandler(response http.ResponseWriter, request *http.Request) { vars := mux.Vars(request) providerID := vars["provider"] - if provider, ok := currentConfigurations[providerID]; ok { + if provider, ok := provider.server.currentConfigurations[providerID]; ok { templatesRenderer.JSON(response, http.StatusOK, provider.Backends) } else { http.NotFound(response, request) } } -func getBackendHandler(response http.ResponseWriter, request *http.Request) { +func (provider *WebProvider) getBackendHandler(response http.ResponseWriter, request *http.Request) { vars := mux.Vars(request) providerID := vars["provider"] backendID := vars["backend"] - if provider, ok := currentConfigurations[providerID]; ok { + if provider, ok := provider.server.currentConfigurations[providerID]; ok { if backend, ok := provider.Backends[backendID]; ok { templatesRenderer.JSON(response, http.StatusOK, backend) return @@ -136,11 +137,11 @@ func getBackendHandler(response http.ResponseWriter, request *http.Request) { http.NotFound(response, request) } -func getServersHandler(response http.ResponseWriter, request *http.Request) { +func (provider *WebProvider) getServersHandler(response http.ResponseWriter, request *http.Request) { vars := mux.Vars(request) providerID := vars["provider"] backendID := vars["backend"] - if provider, ok := currentConfigurations[providerID]; ok { + if provider, ok := provider.server.currentConfigurations[providerID]; ok { if backend, ok := provider.Backends[backendID]; ok { templatesRenderer.JSON(response, http.StatusOK, backend.Servers) return @@ -149,12 +150,12 @@ func getServersHandler(response http.ResponseWriter, request *http.Request) { http.NotFound(response, request) } -func getServerHandler(response http.ResponseWriter, request *http.Request) { +func (provider *WebProvider) getServerHandler(response http.ResponseWriter, request *http.Request) { vars := mux.Vars(request) providerID := vars["provider"] backendID := vars["backend"] serverID := vars["server"] - if provider, ok := currentConfigurations[providerID]; ok { + if provider, ok := provider.server.currentConfigurations[providerID]; ok { if backend, ok := provider.Backends[backendID]; ok { if server, ok := backend.Servers[serverID]; ok { templatesRenderer.JSON(response, http.StatusOK, server) @@ -165,21 +166,21 @@ func getServerHandler(response http.ResponseWriter, request *http.Request) { http.NotFound(response, request) } -func getFrontendsHandler(response http.ResponseWriter, request *http.Request) { +func (provider *WebProvider) getFrontendsHandler(response http.ResponseWriter, request *http.Request) { vars := mux.Vars(request) providerID := vars["provider"] - if provider, ok := currentConfigurations[providerID]; ok { + if provider, ok := provider.server.currentConfigurations[providerID]; ok { templatesRenderer.JSON(response, http.StatusOK, provider.Frontends) } else { http.NotFound(response, request) } } -func getFrontendHandler(response http.ResponseWriter, request *http.Request) { +func (provider *WebProvider) getFrontendHandler(response http.ResponseWriter, request *http.Request) { vars := mux.Vars(request) providerID := vars["provider"] frontendID := vars["frontend"] - if provider, ok := currentConfigurations[providerID]; ok { + if provider, ok := provider.server.currentConfigurations[providerID]; ok { if frontend, ok := provider.Frontends[frontendID]; ok { templatesRenderer.JSON(response, http.StatusOK, frontend) return @@ -188,11 +189,11 @@ func getFrontendHandler(response http.ResponseWriter, request *http.Request) { http.NotFound(response, request) } -func getRoutesHandler(response http.ResponseWriter, request *http.Request) { +func (provider *WebProvider) getRoutesHandler(response http.ResponseWriter, request *http.Request) { vars := mux.Vars(request) providerID := vars["provider"] frontendID := vars["frontend"] - if provider, ok := currentConfigurations[providerID]; ok { + if provider, ok := provider.server.currentConfigurations[providerID]; ok { if frontend, ok := provider.Frontends[frontendID]; ok { templatesRenderer.JSON(response, http.StatusOK, frontend.Routes) return @@ -201,12 +202,12 @@ func getRoutesHandler(response http.ResponseWriter, request *http.Request) { http.NotFound(response, request) } -func getRouteHandler(response http.ResponseWriter, request *http.Request) { +func (provider *WebProvider) getRouteHandler(response http.ResponseWriter, request *http.Request) { vars := mux.Vars(request) providerID := vars["provider"] frontendID := vars["frontend"] routeID := vars["route"] - if provider, ok := currentConfigurations[providerID]; ok { + if provider, ok := provider.server.currentConfigurations[providerID]; ok { if frontend, ok := provider.Frontends[frontendID]; ok { if route, ok := frontend.Routes[routeID]; ok { templatesRenderer.JSON(response, http.StatusOK, route)