Circuit breaker middleware

This commit is contained in:
emile 2015-09-19 13:02:59 +02:00
parent 9d46c5017c
commit 616c58ad4b
5 changed files with 97 additions and 40 deletions

View file

@ -49,3 +49,7 @@ image: build
dist: dist:
mkdir dist mkdir dist
run-dev:
go build
./traefik

43
adapters.go Normal file
View file

@ -0,0 +1,43 @@
/*
Copyright
*/
package main
import (
"net/http"
"github.com/mailgun/oxy/utils"
"github.com/gorilla/mux"
)
type OxyLogger struct{
}
func (oxylogger *OxyLogger) Infof(format string, args ...interface{}) {
log.Debug(format, args...)
}
func (oxylogger *OxyLogger) Warningf(format string, args ...interface{}) {
log.Warning(format, args...)
}
func (oxylogger *OxyLogger) Errorf(format string, args ...interface{}) {
log.Error(format, args...)
}
type ErrorHandler struct {
}
func (e *ErrorHandler) ServeHTTP(w http.ResponseWriter, req *http.Request, err error) {
log.Error("server error ", err.Error())
utils.DefaultHandler.ServeHTTP(w, req, err)
}
func notFoundHandler(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
//templatesRenderer.HTML(w, http.StatusNotFound, "notFound", nil)
}
func LoadDefaultConfig(gloablConfiguration *GlobalConfiguration) *mux.Router {
router := mux.NewRouter()
router.NotFoundHandler = http.HandlerFunc(notFoundHandler)
return router
}

View file

@ -172,7 +172,7 @@ func (provider *MarathonProvider) loadMarathonConfig() *Configuration {
err = tmpl.Execute(&buffer, templateObjects) err = tmpl.Execute(&buffer, templateObjects)
if err != nil { if err != nil {
log.Error("Error with docker template:", err) log.Error("Error with marathon template:", err)
return nil return nil
} }

21
middlewares/cbreaker.go Normal file
View file

@ -0,0 +1,21 @@
/*
Copyright
*/
package middlewares
import (
"net/http"
"github.com/mailgun/oxy/cbreaker"
)
type CircuitBreaker struct {
circuitBreaker *cbreaker.CircuitBreaker
}
func NewCircuitBreaker(next http.Handler, options ...cbreaker.CircuitBreakerOption) *CircuitBreaker {
circuitBreaker, _ := cbreaker.New(next, "NetworkErrorRatio() > 0.5", options...)
return &CircuitBreaker{circuitBreaker}
}
func (cb *CircuitBreaker) ServeHTTP(rw http.ResponseWriter, r *http.Request, next http.HandlerFunc) {
cb.circuitBreaker.ServeHTTP(rw, r)
}

View file

@ -19,6 +19,8 @@ import (
"reflect" "reflect"
"syscall" "syscall"
"time" "time"
fmtlog "log"
"github.com/mailgun/oxy/cbreaker"
) )
var ( var (
@ -34,23 +36,9 @@ var (
}) })
) )
type OxyLogger struct{
}
func (oxylogger *OxyLogger) Infof(format string, args ...interface{}) {
log.Info(format, args...)
}
func (oxylogger *OxyLogger) Warningf(format string, args ...interface{}) {
log.Warning(format, args...)
}
func (oxylogger *OxyLogger) Errorf(format string, args ...interface{}) {
log.Error(format, args...)
}
func main() { func main() {
kingpin.Parse() kingpin.Parse()
fmtlog.SetFlags(fmtlog.Lshortfile | fmtlog.LstdFlags)
var srv *graceful.Server var srv *graceful.Server
var configurationRouter *mux.Router var configurationRouter *mux.Router
var configurationChan = make(chan *Configuration) var configurationChan = make(chan *Configuration)
@ -107,10 +95,15 @@ func main() {
} else if reflect.DeepEqual(currentConfiguration, configuration) { } else if reflect.DeepEqual(currentConfiguration, configuration) {
log.Info("Skipping same configuration") log.Info("Skipping same configuration")
} else { } else {
newConfigurationRouter, err := LoadConfig(configuration, gloablConfiguration)
if (err == nil ){
currentConfiguration = configuration currentConfiguration = configuration
configurationRouter = LoadConfig(configuration, gloablConfiguration) configurationRouter = newConfigurationRouter
srv.Stop(time.Duration(gloablConfiguration.GraceTimeOut) * time.Second) srv.Stop(time.Duration(gloablConfiguration.GraceTimeOut) * time.Second)
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
}else{
log.Error("Error loading new configuration, aborted ", err)
}
} }
} }
}() }()
@ -132,6 +125,7 @@ func main() {
if gloablConfiguration.Web != nil { if gloablConfiguration.Web != nil {
providers = append(providers, gloablConfiguration.Web) providers = append(providers, gloablConfiguration.Web)
} }
// providers = append(providers, NewConsulProvider())
// start providers // start providers
for _, provider := range providers { for _, provider := range providers {
@ -159,6 +153,7 @@ func main() {
var negroni = negroni.New() var negroni = negroni.New()
negroni.Use(metrics) negroni.Use(metrics)
negroni.Use(loggerMiddleware) negroni.Use(loggerMiddleware)
//negroni.Use(middlewares.NewCircuitBreaker(oxyLogger))
//negroni.Use(middlewares.NewRoutes(configurationRouter)) //negroni.Use(middlewares.NewRoutes(configurationRouter))
negroni.UseHandler(configurationRouter) negroni.UseHandler(configurationRouter)
@ -185,27 +180,16 @@ func main() {
} }
} }
func notFoundHandler(w http.ResponseWriter, r *http.Request) { func LoadConfig(configuration *Configuration, gloablConfiguration *GlobalConfiguration) (*mux.Router, error) {
http.NotFound(w, r)
//templatesRenderer.HTML(w, http.StatusNotFound, "notFound", nil)
}
func LoadDefaultConfig(gloablConfiguration *GlobalConfiguration) *mux.Router {
router := mux.NewRouter()
router.NotFoundHandler = http.HandlerFunc(notFoundHandler)
return router
}
func LoadConfig(configuration *Configuration, gloablConfiguration *GlobalConfiguration) *mux.Router {
router := mux.NewRouter() router := mux.NewRouter()
router.NotFoundHandler = http.HandlerFunc(notFoundHandler) router.NotFoundHandler = http.HandlerFunc(notFoundHandler)
backends := map[string]http.Handler{} backends := map[string]http.Handler{}
for frontendName, frontend := range configuration.Frontends { for frontendName, frontend := range configuration.Frontends {
log.Debug("Creating frontend %s", frontendName) log.Debug("Creating frontend %s", frontendName)
fwd, _ := forward.New() fwd, _ := forward.New(forward.Logger(oxyLogger))
newRoute := router.NewRoute().Name(frontendName) newRoute := router.NewRoute().Name(frontendName)
for routeName, route := range frontend.Routes { for routeName, route := range frontend.Routes {
log.Debug("Creating route %s", routeName) log.Debug("Creating route %s %s:%s", routeName, route.Rule, route.Value)
newRouteReflect := Invoke(newRoute, route.Rule, route.Value) newRouteReflect := Invoke(newRoute, route.Rule, route.Value)
newRoute = newRouteReflect[0].Interface().(*mux.Route) newRoute = newRouteReflect[0].Interface().(*mux.Route)
} }
@ -214,22 +198,27 @@ func LoadConfig(configuration *Configuration, gloablConfiguration *GlobalConfigu
lb, _ := roundrobin.New(fwd) lb, _ := roundrobin.New(fwd)
rb, _ := roundrobin.NewRebalancer(lb, roundrobin.RebalancerLogger(oxyLogger)) rb, _ := roundrobin.NewRebalancer(lb, roundrobin.RebalancerLogger(oxyLogger))
for serverName, server := range configuration.Backends[frontend.Backend].Servers { for serverName, server := range configuration.Backends[frontend.Backend].Servers {
log.Debug("Creating server %s", serverName) if url, err := url.Parse(server.Url); err != nil{
url, _ := url.Parse(server.Url) return nil, err
}else{
log.Debug("Creating server %s %s", serverName, url.String())
rb.UpsertServer(url, roundrobin.Weight(server.Weight)) rb.UpsertServer(url, roundrobin.Weight(server.Weight))
} }
backends[frontend.Backend] = lb }
backends[frontend.Backend] = rb
} else { } else {
log.Debug("Reusing backend %s", frontend.Backend) log.Debug("Reusing backend %s", frontend.Backend)
} }
// stream.New(backends[frontend.Backend], stream.Retry("IsNetworkError() && Attempts() <= " + strconv.Itoa(gloablConfiguration.Replay)), stream.Logger(oxyLogger)) // stream.New(backends[frontend.Backend], stream.Retry("IsNetworkError() && Attempts() <= " + strconv.Itoa(gloablConfiguration.Replay)), stream.Logger(oxyLogger))
newRoute.Handler(backends[frontend.Backend]) var negroni = negroni.New()
negroni.Use(middlewares.NewCircuitBreaker(backends[frontend.Backend], cbreaker.Logger(oxyLogger)))
newRoute.Handler(negroni)
err := newRoute.GetError() err := newRoute.GetError()
if err != nil { if err != nil {
log.Error("Error building route ", err) log.Error("Error building route ", err)
} }
} }
return router return router, nil
} }
func Invoke(any interface{}, name string, args ...interface{}) []reflect.Value { func Invoke(any interface{}, name string, args ...interface{}) []reflect.Value {