Handle capture on redefined http.responseWriters
Co-authored-by: Mathieu Lonjaret <mathieu.lonjaret@gmail.com>
This commit is contained in:
parent
7582da9650
commit
a041a6b198
11 changed files with 228 additions and 83 deletions
|
@ -31,7 +31,6 @@ import (
|
||||||
"github.com/traefik/traefik/v2/pkg/log"
|
"github.com/traefik/traefik/v2/pkg/log"
|
||||||
"github.com/traefik/traefik/v2/pkg/metrics"
|
"github.com/traefik/traefik/v2/pkg/metrics"
|
||||||
"github.com/traefik/traefik/v2/pkg/middlewares/accesslog"
|
"github.com/traefik/traefik/v2/pkg/middlewares/accesslog"
|
||||||
"github.com/traefik/traefik/v2/pkg/middlewares/capture"
|
|
||||||
"github.com/traefik/traefik/v2/pkg/provider/acme"
|
"github.com/traefik/traefik/v2/pkg/provider/acme"
|
||||||
"github.com/traefik/traefik/v2/pkg/provider/aggregator"
|
"github.com/traefik/traefik/v2/pkg/provider/aggregator"
|
||||||
"github.com/traefik/traefik/v2/pkg/provider/hub"
|
"github.com/traefik/traefik/v2/pkg/provider/hub"
|
||||||
|
@ -260,9 +259,8 @@ func setupServer(staticConfiguration *static.Configuration) (*server.Server, err
|
||||||
|
|
||||||
accessLog := setupAccessLog(staticConfiguration.AccessLog)
|
accessLog := setupAccessLog(staticConfiguration.AccessLog)
|
||||||
tracer := setupTracing(staticConfiguration.Tracing)
|
tracer := setupTracing(staticConfiguration.Tracing)
|
||||||
captureMiddleware := setupCapture(staticConfiguration)
|
|
||||||
|
|
||||||
chainBuilder := middleware.NewChainBuilder(metricsRegistry, accessLog, tracer, captureMiddleware)
|
chainBuilder := middleware.NewChainBuilder(metricsRegistry, accessLog, tracer)
|
||||||
routerFactory := server.NewRouterFactory(*staticConfiguration, managerFactory, tlsManager, chainBuilder, pluginBuilder, metricsRegistry)
|
routerFactory := server.NewRouterFactory(*staticConfiguration, managerFactory, tlsManager, chainBuilder, pluginBuilder, metricsRegistry)
|
||||||
|
|
||||||
// Watcher
|
// Watcher
|
||||||
|
@ -565,13 +563,6 @@ func setupTracing(conf *static.Tracing) *tracing.Tracing {
|
||||||
return tracer
|
return tracer
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupCapture(staticConfiguration *static.Configuration) *capture.Handler {
|
|
||||||
if staticConfiguration.AccessLog == nil && staticConfiguration.Metrics == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return &capture.Handler{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func configureLogging(staticConfiguration *static.Configuration) {
|
func configureLogging(staticConfiguration *static.Configuration) {
|
||||||
// configure default log flags
|
// configure default log flags
|
||||||
stdlog.SetFlags(stdlog.Lshortfile | stdlog.LstdFlags)
|
stdlog.SetFlags(stdlog.Lshortfile | stdlog.LstdFlags)
|
||||||
|
|
|
@ -0,0 +1,66 @@
|
||||||
|
[global]
|
||||||
|
checkNewVersion = false
|
||||||
|
sendAnonymousUsage = false
|
||||||
|
|
||||||
|
[log]
|
||||||
|
level = "DEBUG"
|
||||||
|
|
||||||
|
[entryPoints]
|
||||||
|
[entryPoints.webA]
|
||||||
|
address = ":8001"
|
||||||
|
[entryPoints.webB]
|
||||||
|
address = ":8002"
|
||||||
|
[entryPoints.webC]
|
||||||
|
address = ":8003"
|
||||||
|
|
||||||
|
[api]
|
||||||
|
insecure = true
|
||||||
|
|
||||||
|
[metrics]
|
||||||
|
[metrics.prometheus]
|
||||||
|
buckets = "0.1,0.3,1.2,5.0"
|
||||||
|
|
||||||
|
[providers]
|
||||||
|
[providers.file]
|
||||||
|
filename = "{{ .SelfFilename }}"
|
||||||
|
|
||||||
|
## dynamic configuration ##
|
||||||
|
|
||||||
|
[http.routers]
|
||||||
|
|
||||||
|
[http.routers.router-without]
|
||||||
|
entrypoints = ["webA"]
|
||||||
|
service = "service-without"
|
||||||
|
rule = "PathPrefix(`/without`)"
|
||||||
|
|
||||||
|
[http.routers.router-req]
|
||||||
|
entrypoints = ["webB"]
|
||||||
|
service = "service-req"
|
||||||
|
rule = "PathPrefix(`/with-req`)"
|
||||||
|
middlewares = ["buffer-req"]
|
||||||
|
|
||||||
|
[http.routers.router-resp]
|
||||||
|
entrypoints = ["webC"]
|
||||||
|
service = "service-resp"
|
||||||
|
rule = "PathPrefix(`/with-resp`)"
|
||||||
|
middlewares = ["buffer-resp"]
|
||||||
|
|
||||||
|
[http.middlewares]
|
||||||
|
[http.middlewares.buffer-req.buffering]
|
||||||
|
maxRequestBodyBytes = 10
|
||||||
|
|
||||||
|
[http.middlewares.buffer-resp.buffering]
|
||||||
|
maxResponseBodyBytes = 10
|
||||||
|
|
||||||
|
[http.services]
|
||||||
|
[http.services.service-without.loadBalancer]
|
||||||
|
[[http.services.service-without.loadBalancer.servers]]
|
||||||
|
url = "http://{{ .IP }}"
|
||||||
|
|
||||||
|
[http.services.service-req.loadBalancer]
|
||||||
|
[[http.services.service-req.loadBalancer.servers]]
|
||||||
|
url = "http://{{ .IP }}"
|
||||||
|
|
||||||
|
[http.services.service-resp.loadBalancer]
|
||||||
|
[[http.services.service-resp.loadBalancer.servers]]
|
||||||
|
url = "http://{{ .IP }}"
|
|
@ -308,7 +308,7 @@ func (s *SimpleSuite) TestMetricsPrometheusDefaultEntryPoint(c *check.C) {
|
||||||
c.Assert(err, checker.IsNil)
|
c.Assert(err, checker.IsNil)
|
||||||
defer s.killCmd(cmd)
|
defer s.killCmd(cmd)
|
||||||
|
|
||||||
err = try.GetRequest("http://127.0.0.1:8080/api/rawdata", 1*time.Second, try.BodyContains("PathPrefix"))
|
err = try.GetRequest("http://127.0.0.1:8080/api/rawdata", 1*time.Second, try.BodyContains("PathPrefix(`/whoami`)"))
|
||||||
c.Assert(err, checker.IsNil)
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
err = try.GetRequest("http://127.0.0.1:8000/whoami", 1*time.Second, try.StatusCodeIs(http.StatusOK))
|
err = try.GetRequest("http://127.0.0.1:8000/whoami", 1*time.Second, try.StatusCodeIs(http.StatusOK))
|
||||||
|
@ -369,6 +369,84 @@ func (s *SimpleSuite) TestMetricsPrometheusTwoRoutersOneService(c *check.C) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestMetricsWithBufferingMiddleware checks that the buffering middleware
|
||||||
|
// (which introduces its own response writer in the chain), does not interfere with
|
||||||
|
// the capture middleware on which the metrics mechanism relies.
|
||||||
|
func (s *SimpleSuite) TestMetricsWithBufferingMiddleware(c *check.C) {
|
||||||
|
s.createComposeProject(c, "base")
|
||||||
|
|
||||||
|
s.composeUp(c)
|
||||||
|
defer s.composeDown(c)
|
||||||
|
|
||||||
|
server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
_, _ = w.Write([]byte("MORE THAN TEN BYTES IN RESPONSE"))
|
||||||
|
}))
|
||||||
|
|
||||||
|
server.Start()
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
file := s.adaptFile(c, "fixtures/simple_metrics_with_buffer_middleware.toml", struct{ IP string }{IP: strings.TrimPrefix(server.URL, "http://")})
|
||||||
|
defer os.Remove(file)
|
||||||
|
|
||||||
|
cmd, output := s.traefikCmd(withConfigFile(file))
|
||||||
|
defer output(c)
|
||||||
|
|
||||||
|
err := cmd.Start()
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
defer s.killCmd(cmd)
|
||||||
|
|
||||||
|
err = try.GetRequest("http://127.0.0.1:8080/api/rawdata", 1*time.Second, try.BodyContains("PathPrefix(`/without`)"))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
err = try.GetRequest("http://127.0.0.1:8001/without", 1*time.Second, try.StatusCodeIs(http.StatusOK))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8002/with-req", strings.NewReader("MORE THAN TEN BYTES IN REQUEST"))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// The request should fail because the body is too large.
|
||||||
|
err = try.Request(req, 1*time.Second, try.StatusCodeIs(http.StatusRequestEntityTooLarge))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// The request should fail because the response exceeds the configured limit.
|
||||||
|
err = try.GetRequest("http://127.0.0.1:8003/with-resp", 1*time.Second, try.StatusCodeIs(http.StatusInternalServerError))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
request, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8080/metrics", nil)
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
response, err := http.DefaultClient.Do(request)
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
c.Assert(response.StatusCode, checker.Equals, http.StatusOK)
|
||||||
|
|
||||||
|
body, err := io.ReadAll(response.Body)
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// For allowed requests and responses, the entrypoint and service metrics have the same status code.
|
||||||
|
c.Assert(string(body), checker.Contains, "traefik_entrypoint_requests_total{code=\"200\",entrypoint=\"webA\",method=\"GET\",protocol=\"http\"} 1")
|
||||||
|
c.Assert(string(body), checker.Contains, "traefik_entrypoint_requests_bytes_total{code=\"200\",entrypoint=\"webA\",method=\"GET\",protocol=\"http\"} 0")
|
||||||
|
c.Assert(string(body), checker.Contains, "traefik_entrypoint_responses_bytes_total{code=\"200\",entrypoint=\"webA\",method=\"GET\",protocol=\"http\"} 31")
|
||||||
|
|
||||||
|
c.Assert(string(body), checker.Contains, "traefik_service_requests_total{code=\"200\",method=\"GET\",protocol=\"http\",service=\"service-without@file\"} 1")
|
||||||
|
c.Assert(string(body), checker.Contains, "traefik_service_requests_bytes_total{code=\"200\",method=\"GET\",protocol=\"http\",service=\"service-without@file\"} 0")
|
||||||
|
c.Assert(string(body), checker.Contains, "traefik_service_responses_bytes_total{code=\"200\",method=\"GET\",protocol=\"http\",service=\"service-without@file\"} 31")
|
||||||
|
|
||||||
|
// For forbidden requests, the entrypoints have metrics, the services don't.
|
||||||
|
c.Assert(string(body), checker.Contains, "traefik_entrypoint_requests_total{code=\"413\",entrypoint=\"webB\",method=\"GET\",protocol=\"http\"} 1")
|
||||||
|
c.Assert(string(body), checker.Contains, "traefik_entrypoint_requests_bytes_total{code=\"413\",entrypoint=\"webB\",method=\"GET\",protocol=\"http\"} 0")
|
||||||
|
c.Assert(string(body), checker.Contains, "traefik_entrypoint_responses_bytes_total{code=\"413\",entrypoint=\"webB\",method=\"GET\",protocol=\"http\"} 24")
|
||||||
|
|
||||||
|
// For disallowed responses, the entrypoint and service metrics don't have the same status code.
|
||||||
|
c.Assert(string(body), checker.Contains, "traefik_entrypoint_requests_bytes_total{code=\"500\",entrypoint=\"webC\",method=\"GET\",protocol=\"http\"} 0")
|
||||||
|
c.Assert(string(body), checker.Contains, "traefik_entrypoint_requests_total{code=\"500\",entrypoint=\"webC\",method=\"GET\",protocol=\"http\"} 1")
|
||||||
|
c.Assert(string(body), checker.Contains, "traefik_entrypoint_responses_bytes_total{code=\"500\",entrypoint=\"webC\",method=\"GET\",protocol=\"http\"} 21")
|
||||||
|
|
||||||
|
c.Assert(string(body), checker.Contains, "traefik_service_requests_bytes_total{code=\"200\",method=\"GET\",protocol=\"http\",service=\"service-resp@file\"} 0")
|
||||||
|
c.Assert(string(body), checker.Contains, "traefik_service_requests_total{code=\"200\",method=\"GET\",protocol=\"http\",service=\"service-resp@file\"} 1")
|
||||||
|
c.Assert(string(body), checker.Contains, "traefik_service_responses_bytes_total{code=\"200\",method=\"GET\",protocol=\"http\",service=\"service-resp@file\"} 31")
|
||||||
|
}
|
||||||
|
|
||||||
func (s *SimpleSuite) TestMultipleProviderSameBackendName(c *check.C) {
|
func (s *SimpleSuite) TestMultipleProviderSameBackendName(c *check.C) {
|
||||||
s.createComposeProject(c, "base")
|
s.createComposeProject(c, "base")
|
||||||
|
|
||||||
|
|
|
@ -227,6 +227,15 @@ func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request, next http
|
||||||
core[ClientHost] = forwardedFor
|
core[ClientHost] = forwardedFor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx := req.Context()
|
||||||
|
capt, err := capture.FromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.FromContext(log.With(ctx, log.Str(log.MiddlewareType, "AccessLogs"))).
|
||||||
|
WithError(err).
|
||||||
|
Errorf("Could not get Capture")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
next.ServeHTTP(rw, reqWithDataTable)
|
next.ServeHTTP(rw, reqWithDataTable)
|
||||||
|
|
||||||
if _, ok := core[ClientUsername]; !ok {
|
if _, ok := core[ClientUsername]; !ok {
|
||||||
|
@ -237,13 +246,6 @@ func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request, next http
|
||||||
headers: rw.Header().Clone(),
|
headers: rw.Header().Clone(),
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := req.Context()
|
|
||||||
capt, err := capture.FromContext(ctx)
|
|
||||||
if err != nil {
|
|
||||||
log.FromContext(log.With(ctx, log.Str(log.MiddlewareType, "AccessLogs"))).Errorf("Could not get Capture: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
logDataTable.DownstreamResponse.status = capt.StatusCode()
|
logDataTable.DownstreamResponse.status = capt.StatusCode()
|
||||||
logDataTable.DownstreamResponse.size = capt.ResponseSize()
|
logDataTable.DownstreamResponse.size = capt.ResponseSize()
|
||||||
logDataTable.Request.size = capt.RequestSize()
|
logDataTable.Request.size = capt.RequestSize()
|
||||||
|
|
|
@ -57,7 +57,7 @@ func TestLogRotation(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
chain := alice.New()
|
chain := alice.New()
|
||||||
chain = chain.Append(capture.WrapHandler(&capture.Handler{}))
|
chain = chain.Append(capture.Wrap)
|
||||||
chain = chain.Append(WrapHandler(logHandler))
|
chain = chain.Append(WrapHandler(logHandler))
|
||||||
handler, err := chain.Then(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
handler, err := chain.Then(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||||
rw.WriteHeader(http.StatusOK)
|
rw.WriteHeader(http.StatusOK)
|
||||||
|
@ -210,7 +210,7 @@ func TestLoggerHeaderFields(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
chain := alice.New()
|
chain := alice.New()
|
||||||
chain = chain.Append(capture.WrapHandler(&capture.Handler{}))
|
chain = chain.Append(capture.Wrap)
|
||||||
chain = chain.Append(WrapHandler(logger))
|
chain = chain.Append(WrapHandler(logger))
|
||||||
handler, err := chain.Then(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
handler, err := chain.Then(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||||
rw.WriteHeader(http.StatusOK)
|
rw.WriteHeader(http.StatusOK)
|
||||||
|
@ -784,7 +784,7 @@ func doLoggingTLSOpt(t *testing.T, config *types.AccessLog, enableTLS bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
chain := alice.New()
|
chain := alice.New()
|
||||||
chain = chain.Append(capture.WrapHandler(&capture.Handler{}))
|
chain = chain.Append(capture.Wrap)
|
||||||
chain = chain.Append(WrapHandler(logger))
|
chain = chain.Append(WrapHandler(logger))
|
||||||
handler, err := chain.Then(http.HandlerFunc(logWriterTestHandlerFunc))
|
handler, err := chain.Then(http.HandlerFunc(logWriterTestHandlerFunc))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -3,9 +3,8 @@
|
||||||
// For another middleware to get those attributes of a request/response, this middleware
|
// For another middleware to get those attributes of a request/response, this middleware
|
||||||
// should be added before in the middleware chain.
|
// should be added before in the middleware chain.
|
||||||
//
|
//
|
||||||
// handler, _ := NewHandler()
|
|
||||||
// chain := alice.New().
|
// chain := alice.New().
|
||||||
// Append(WrapHandler(handler)).
|
// Append(capture.Wrap).
|
||||||
// Append(myOtherMiddleware).
|
// Append(myOtherMiddleware).
|
||||||
// then(...)
|
// then(...)
|
||||||
//
|
//
|
||||||
|
@ -33,7 +32,6 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/containous/alice"
|
|
||||||
"github.com/traefik/traefik/v2/pkg/middlewares"
|
"github.com/traefik/traefik/v2/pkg/middlewares"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -41,62 +39,67 @@ type key string
|
||||||
|
|
||||||
const capturedData key = "capturedData"
|
const capturedData key = "capturedData"
|
||||||
|
|
||||||
// Handler will store each request data to its context.
|
// Wrap returns a new handler that inserts a Capture into the given handler.
|
||||||
type Handler struct{}
|
// It satisfies the alice.Constructor type.
|
||||||
|
func Wrap(handler http.Handler) (http.Handler, error) {
|
||||||
// WrapHandler wraps capture handler into an Alice Constructor.
|
c := Capture{}
|
||||||
func WrapHandler(handler *Handler) alice.Constructor {
|
return c.Reset(handler), nil
|
||||||
return func(next http.Handler) (http.Handler, error) {
|
|
||||||
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
|
||||||
handler.ServeHTTP(rw, req, next)
|
|
||||||
}), nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request, next http.Handler) {
|
// FromContext returns the Capture value found in ctx, or an empty Capture otherwise.
|
||||||
c := Capture{}
|
func FromContext(ctx context.Context) (Capture, error) {
|
||||||
if req.Body != nil {
|
c := ctx.Value(capturedData)
|
||||||
readCounter := &readCounter{source: req.Body}
|
if c == nil {
|
||||||
c.rr = readCounter
|
return Capture{}, errors.New("value not found in context")
|
||||||
req.Body = readCounter
|
|
||||||
}
|
}
|
||||||
responseWriter := newResponseWriter(rw)
|
capt, ok := c.(*Capture)
|
||||||
c.rw = responseWriter
|
if !ok {
|
||||||
ctx := context.WithValue(req.Context(), capturedData, &c)
|
return Capture{}, errors.New("value stored in context is not a *Capture")
|
||||||
next.ServeHTTP(responseWriter, req.WithContext(ctx))
|
}
|
||||||
|
return *capt, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Capture is the object populated by the capture middleware,
|
// Capture is the object populated by the capture middleware,
|
||||||
// allowing to gather information about the request and response.
|
// holding probes that allow to gather information about the request and response.
|
||||||
type Capture struct {
|
type Capture struct {
|
||||||
rr *readCounter
|
rr *readCounter
|
||||||
rw responseWriter
|
rw responseWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
// FromContext returns the Capture value found in ctx, or an empty Capture otherwise.
|
// NeedsReset returns whether the given http.ResponseWriter is the capture's probe.
|
||||||
func FromContext(ctx context.Context) (*Capture, error) {
|
func (c *Capture) NeedsReset(rw http.ResponseWriter) bool {
|
||||||
c := ctx.Value(capturedData)
|
return c.rw != rw
|
||||||
if c == nil {
|
|
||||||
return nil, errors.New("value not found")
|
|
||||||
}
|
|
||||||
capt, ok := c.(*Capture)
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("value stored in Context is not a *Capture")
|
|
||||||
}
|
|
||||||
return capt, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Capture) ResponseSize() int64 {
|
// Reset returns a new handler that renews the Capture's probes, and inserts
|
||||||
|
// them when deferring to next.
|
||||||
|
func (c *Capture) Reset(next http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
ctx := context.WithValue(req.Context(), capturedData, c)
|
||||||
|
newReq := req.WithContext(ctx)
|
||||||
|
|
||||||
|
if newReq.Body != nil {
|
||||||
|
readCounter := &readCounter{source: newReq.Body}
|
||||||
|
c.rr = readCounter
|
||||||
|
newReq.Body = readCounter
|
||||||
|
}
|
||||||
|
c.rw = newResponseWriter(rw)
|
||||||
|
|
||||||
|
next.ServeHTTP(c.rw, newReq)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Capture) ResponseSize() int64 {
|
||||||
return c.rw.Size()
|
return c.rw.Size()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Capture) StatusCode() int {
|
func (c *Capture) StatusCode() int {
|
||||||
return c.rw.Status()
|
return c.rw.Status()
|
||||||
}
|
}
|
||||||
|
|
||||||
// RequestSize returns the size of the request's body if it applies,
|
// RequestSize returns the size of the request's body if it applies,
|
||||||
// zero otherwise.
|
// zero otherwise.
|
||||||
func (c Capture) RequestSize() int64 {
|
func (c *Capture) RequestSize() int64 {
|
||||||
if c.rr == nil {
|
if c.rr == nil {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,9 +38,8 @@ func TestCapture(t *testing.T) {
|
||||||
assert.Equal(t, "bar", string(all))
|
assert.Equal(t, "bar", string(all))
|
||||||
})
|
})
|
||||||
|
|
||||||
wrapped := WrapHandler(&Handler{})
|
|
||||||
chain := alice.New()
|
chain := alice.New()
|
||||||
chain = chain.Append(wrapped)
|
chain = chain.Append(Wrap)
|
||||||
chain = chain.Append(wrapMiddleware)
|
chain = chain.Append(wrapMiddleware)
|
||||||
handlers, err := chain.Then(handler)
|
handlers, err := chain.Then(handler)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -142,8 +141,7 @@ func BenchmarkCapture(b *testing.B) {
|
||||||
|
|
||||||
chain := alice.New()
|
chain := alice.New()
|
||||||
if test.capture || test.body {
|
if test.capture || test.body {
|
||||||
captureWrapped := WrapHandler(&Handler{})
|
chain = chain.Append(Wrap)
|
||||||
chain = chain.Append(captureWrapped)
|
|
||||||
}
|
}
|
||||||
handlers, err := chain.Then(next)
|
handlers, err := chain.Then(next)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
|
|
|
@ -24,6 +24,7 @@ const (
|
||||||
protoWebsocket = "websocket"
|
protoWebsocket = "websocket"
|
||||||
typeName = "Metrics"
|
typeName = "Metrics"
|
||||||
nameEntrypoint = "metrics-entrypoint"
|
nameEntrypoint = "metrics-entrypoint"
|
||||||
|
nameRouter = "metrics-router"
|
||||||
nameService = "metrics-service"
|
nameService = "metrics-service"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -56,7 +57,7 @@ func NewEntryPointMiddleware(ctx context.Context, next http.Handler, registry me
|
||||||
|
|
||||||
// NewRouterMiddleware creates a new metrics middleware for a Router.
|
// NewRouterMiddleware creates a new metrics middleware for a Router.
|
||||||
func NewRouterMiddleware(ctx context.Context, next http.Handler, registry metrics.Registry, routerName string, serviceName string) http.Handler {
|
func NewRouterMiddleware(ctx context.Context, next http.Handler, registry metrics.Registry, routerName string, serviceName string) http.Handler {
|
||||||
log.FromContext(middlewares.GetLoggerCtx(ctx, nameEntrypoint, typeName)).Debug("Creating middleware")
|
log.FromContext(middlewares.GetLoggerCtx(ctx, nameRouter, typeName)).Debug("Creating middleware")
|
||||||
|
|
||||||
return &metricsMiddleware{
|
return &metricsMiddleware{
|
||||||
next: next,
|
next: next,
|
||||||
|
@ -125,17 +126,25 @@ func (m *metricsMiddleware) ServeHTTP(rw http.ResponseWriter, req *http.Request)
|
||||||
m.reqsTLSCounter.With(tlsLabels...).Add(1)
|
m.reqsTLSCounter.With(tlsLabels...).Add(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
|
||||||
|
|
||||||
m.next.ServeHTTP(rw, req)
|
|
||||||
|
|
||||||
ctx := req.Context()
|
ctx := req.Context()
|
||||||
|
|
||||||
capt, err := capture.FromContext(ctx)
|
capt, err := capture.FromContext(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.FromContext(middlewares.GetLoggerCtx(ctx, nameEntrypoint, typeName)).Errorf("Could not get Capture: %w", err)
|
for i := 0; i < len(m.baseLabels); i += 2 {
|
||||||
|
ctx = log.With(ctx, log.Str(m.baseLabels[i], m.baseLabels[i+1]))
|
||||||
|
}
|
||||||
|
log.FromContext(ctx).WithError(err).Errorf("Could not get Capture")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
next := m.next
|
||||||
|
if capt.NeedsReset(rw) {
|
||||||
|
next = capt.Reset(m.next)
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
next.ServeHTTP(rw, req)
|
||||||
|
|
||||||
labels = append(labels, "code", strconv.Itoa(capt.StatusCode()))
|
labels = append(labels, "code", strconv.Itoa(capt.StatusCode()))
|
||||||
m.reqDurationHistogram.With(labels...).ObserveFromStart(start)
|
m.reqDurationHistogram.With(labels...).ObserveFromStart(start)
|
||||||
m.reqsCounter.With(labels...).Add(1)
|
m.reqsCounter.With(labels...).Add(1)
|
||||||
|
|
|
@ -18,16 +18,14 @@ type ChainBuilder struct {
|
||||||
metricsRegistry metrics.Registry
|
metricsRegistry metrics.Registry
|
||||||
accessLoggerMiddleware *accesslog.Handler
|
accessLoggerMiddleware *accesslog.Handler
|
||||||
tracer *tracing.Tracing
|
tracer *tracing.Tracing
|
||||||
captureMiddleware *capture.Handler
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewChainBuilder Creates a new ChainBuilder.
|
// NewChainBuilder Creates a new ChainBuilder.
|
||||||
func NewChainBuilder(metricsRegistry metrics.Registry, accessLoggerMiddleware *accesslog.Handler, tracer *tracing.Tracing, captureMiddleware *capture.Handler) *ChainBuilder {
|
func NewChainBuilder(metricsRegistry metrics.Registry, accessLoggerMiddleware *accesslog.Handler, tracer *tracing.Tracing) *ChainBuilder {
|
||||||
return &ChainBuilder{
|
return &ChainBuilder{
|
||||||
metricsRegistry: metricsRegistry,
|
metricsRegistry: metricsRegistry,
|
||||||
accessLoggerMiddleware: accessLoggerMiddleware,
|
accessLoggerMiddleware: accessLoggerMiddleware,
|
||||||
tracer: tracer,
|
tracer: tracer,
|
||||||
captureMiddleware: captureMiddleware,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,8 +33,8 @@ func NewChainBuilder(metricsRegistry metrics.Registry, accessLoggerMiddleware *a
|
||||||
func (c *ChainBuilder) Build(ctx context.Context, entryPointName string) alice.Chain {
|
func (c *ChainBuilder) Build(ctx context.Context, entryPointName string) alice.Chain {
|
||||||
chain := alice.New()
|
chain := alice.New()
|
||||||
|
|
||||||
if c.captureMiddleware != nil {
|
if c.accessLoggerMiddleware != nil || c.metricsRegistry != nil && (c.metricsRegistry.IsEpEnabled() || c.metricsRegistry.IsRouterEnabled() || c.metricsRegistry.IsSvcEnabled()) {
|
||||||
chain = chain.Append(capture.WrapHandler(c.captureMiddleware))
|
chain = chain.Append(capture.Wrap)
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.accessLoggerMiddleware != nil {
|
if c.accessLoggerMiddleware != nil {
|
||||||
|
|
|
@ -316,7 +316,7 @@ func TestRouterManager_Get(t *testing.T) {
|
||||||
roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}})
|
roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}})
|
||||||
serviceManager := service.NewManager(rtConf.Services, nil, nil, roundTripperManager)
|
serviceManager := service.NewManager(rtConf.Services, nil, nil, roundTripperManager)
|
||||||
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil)
|
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil)
|
||||||
chainBuilder := middleware.NewChainBuilder(nil, nil, nil, nil)
|
chainBuilder := middleware.NewChainBuilder(nil, nil, nil)
|
||||||
|
|
||||||
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry())
|
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry())
|
||||||
|
|
||||||
|
@ -422,7 +422,7 @@ func TestAccessLog(t *testing.T) {
|
||||||
roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}})
|
roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}})
|
||||||
serviceManager := service.NewManager(rtConf.Services, nil, nil, roundTripperManager)
|
serviceManager := service.NewManager(rtConf.Services, nil, nil, roundTripperManager)
|
||||||
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil)
|
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil)
|
||||||
chainBuilder := middleware.NewChainBuilder(nil, nil, nil, nil)
|
chainBuilder := middleware.NewChainBuilder(nil, nil, nil)
|
||||||
|
|
||||||
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry())
|
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry())
|
||||||
|
|
||||||
|
@ -439,7 +439,7 @@ func TestAccessLog(t *testing.T) {
|
||||||
reqHost := requestdecorator.New(nil)
|
reqHost := requestdecorator.New(nil)
|
||||||
|
|
||||||
chain := alice.New()
|
chain := alice.New()
|
||||||
chain = chain.Append(capture.WrapHandler(&capture.Handler{}))
|
chain = chain.Append(capture.Wrap)
|
||||||
chain = chain.Append(accesslog.WrapHandler(accesslogger))
|
chain = chain.Append(accesslog.WrapHandler(accesslogger))
|
||||||
handler, err := chain.Then(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
handler, err := chain.Then(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||||
reqHost.ServeHTTP(w, req, handlers["web"].ServeHTTP)
|
reqHost.ServeHTTP(w, req, handlers["web"].ServeHTTP)
|
||||||
|
@ -717,7 +717,7 @@ func TestRuntimeConfiguration(t *testing.T) {
|
||||||
roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}})
|
roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}})
|
||||||
serviceManager := service.NewManager(rtConf.Services, nil, nil, roundTripperManager)
|
serviceManager := service.NewManager(rtConf.Services, nil, nil, roundTripperManager)
|
||||||
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil)
|
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil)
|
||||||
chainBuilder := middleware.NewChainBuilder(nil, nil, nil, nil)
|
chainBuilder := middleware.NewChainBuilder(nil, nil, nil)
|
||||||
|
|
||||||
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry())
|
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry())
|
||||||
|
|
||||||
|
@ -792,7 +792,7 @@ func TestProviderOnMiddlewares(t *testing.T) {
|
||||||
roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}})
|
roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}})
|
||||||
serviceManager := service.NewManager(rtConf.Services, nil, nil, roundTripperManager)
|
serviceManager := service.NewManager(rtConf.Services, nil, nil, roundTripperManager)
|
||||||
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil)
|
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil)
|
||||||
chainBuilder := middleware.NewChainBuilder(nil, nil, nil, nil)
|
chainBuilder := middleware.NewChainBuilder(nil, nil, nil)
|
||||||
|
|
||||||
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry())
|
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry())
|
||||||
|
|
||||||
|
@ -860,7 +860,7 @@ func BenchmarkRouterServe(b *testing.B) {
|
||||||
|
|
||||||
serviceManager := service.NewManager(rtConf.Services, nil, nil, staticRoundTripperGetter{res})
|
serviceManager := service.NewManager(rtConf.Services, nil, nil, staticRoundTripperGetter{res})
|
||||||
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil)
|
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil)
|
||||||
chainBuilder := middleware.NewChainBuilder(nil, nil, nil, nil)
|
chainBuilder := middleware.NewChainBuilder(nil, nil, nil)
|
||||||
|
|
||||||
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry())
|
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry())
|
||||||
|
|
||||||
|
|
|
@ -53,7 +53,7 @@ func TestReuseService(t *testing.T) {
|
||||||
managerFactory := service.NewManagerFactory(staticConfig, nil, metrics.NewVoidRegistry(), roundTripperManager, nil)
|
managerFactory := service.NewManagerFactory(staticConfig, nil, metrics.NewVoidRegistry(), roundTripperManager, nil)
|
||||||
tlsManager := tls.NewManager()
|
tlsManager := tls.NewManager()
|
||||||
|
|
||||||
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(nil, nil, nil, nil), nil, metrics.NewVoidRegistry())
|
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(nil, nil, nil), nil, metrics.NewVoidRegistry())
|
||||||
|
|
||||||
entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: dynamicConfigs}))
|
entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: dynamicConfigs}))
|
||||||
|
|
||||||
|
@ -189,7 +189,7 @@ func TestServerResponseEmptyBackend(t *testing.T) {
|
||||||
managerFactory := service.NewManagerFactory(staticConfig, nil, metrics.NewVoidRegistry(), roundTripperManager, nil)
|
managerFactory := service.NewManagerFactory(staticConfig, nil, metrics.NewVoidRegistry(), roundTripperManager, nil)
|
||||||
tlsManager := tls.NewManager()
|
tlsManager := tls.NewManager()
|
||||||
|
|
||||||
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(nil, nil, nil, nil), nil, metrics.NewVoidRegistry())
|
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(nil, nil, nil), nil, metrics.NewVoidRegistry())
|
||||||
|
|
||||||
entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: test.config(testServer.URL)}))
|
entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: test.config(testServer.URL)}))
|
||||||
|
|
||||||
|
@ -232,7 +232,7 @@ func TestInternalServices(t *testing.T) {
|
||||||
|
|
||||||
voidRegistry := metrics.NewVoidRegistry()
|
voidRegistry := metrics.NewVoidRegistry()
|
||||||
|
|
||||||
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(voidRegistry, nil, nil, nil), nil, voidRegistry)
|
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(voidRegistry, nil, nil), nil, voidRegistry)
|
||||||
|
|
||||||
entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: dynamicConfigs}))
|
entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: dynamicConfigs}))
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue