Support gRPC and gRPC-Web protocol in metrics
This commit is contained in:
parent
d131ef57da
commit
240fb871b6
3 changed files with 142 additions and 2 deletions
|
@ -16,3 +16,7 @@ feature by feature, of how the configuration looked like in v2, and how it now l
|
||||||
## IPWhiteList
|
## IPWhiteList
|
||||||
|
|
||||||
In v3, we renamed the `IPWhiteList` middleware to `IPAllowList` without changing anything to the configuration.
|
In v3, we renamed the `IPWhiteList` middleware to `IPAllowList` without changing anything to the configuration.
|
||||||
|
|
||||||
|
## gRPC Metrics
|
||||||
|
|
||||||
|
In v3, the reported status code for gRPC requests is now the value of the `Grpc-Status` header.
|
||||||
|
|
|
@ -16,10 +16,13 @@ import (
|
||||||
"github.com/traefik/traefik/v2/pkg/middlewares/capture"
|
"github.com/traefik/traefik/v2/pkg/middlewares/capture"
|
||||||
"github.com/traefik/traefik/v2/pkg/middlewares/retry"
|
"github.com/traefik/traefik/v2/pkg/middlewares/retry"
|
||||||
traefiktls "github.com/traefik/traefik/v2/pkg/tls"
|
traefiktls "github.com/traefik/traefik/v2/pkg/tls"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
protoHTTP = "http"
|
protoHTTP = "http"
|
||||||
|
protoGRPC = "grpc"
|
||||||
|
protoGRPCWeb = "grpc-web"
|
||||||
protoSSE = "sse"
|
protoSSE = "sse"
|
||||||
protoWebsocket = "websocket"
|
protoWebsocket = "websocket"
|
||||||
typeName = "Metrics"
|
typeName = "Metrics"
|
||||||
|
@ -109,9 +112,12 @@ func WrapServiceHandler(ctx context.Context, registry metrics.Registry, serviceN
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *metricsMiddleware) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
func (m *metricsMiddleware) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
proto := getRequestProtocol(req)
|
||||||
|
|
||||||
var labels []string
|
var labels []string
|
||||||
labels = append(labels, m.baseLabels...)
|
labels = append(labels, m.baseLabels...)
|
||||||
labels = append(labels, "method", getMethod(req), "protocol", getRequestProtocol(req))
|
labels = append(labels, "method", getMethod(req))
|
||||||
|
labels = append(labels, "protocol", proto)
|
||||||
|
|
||||||
openConnsGauge := m.openConnsGauge.With(labels...)
|
openConnsGauge := m.openConnsGauge.With(labels...)
|
||||||
openConnsGauge.Add(1)
|
openConnsGauge.Add(1)
|
||||||
|
@ -145,7 +151,12 @@ func (m *metricsMiddleware) ServeHTTP(rw http.ResponseWriter, req *http.Request)
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
next.ServeHTTP(rw, req)
|
next.ServeHTTP(rw, req)
|
||||||
|
|
||||||
labels = append(labels, "code", strconv.Itoa(capt.StatusCode()))
|
code := capt.StatusCode()
|
||||||
|
if proto == protoGRPC || proto == protoGRPCWeb {
|
||||||
|
code = grpcStatusCode(rw)
|
||||||
|
}
|
||||||
|
|
||||||
|
labels = append(labels, "code", strconv.Itoa(code))
|
||||||
m.reqDurationHistogram.With(labels...).ObserveFromStart(start)
|
m.reqDurationHistogram.With(labels...).ObserveFromStart(start)
|
||||||
m.reqsCounter.With(labels...).Add(1)
|
m.reqsCounter.With(labels...).Add(1)
|
||||||
m.respsBytesCounter.With(labels...).Add(float64(capt.ResponseSize()))
|
m.respsBytesCounter.With(labels...).Add(float64(capt.ResponseSize()))
|
||||||
|
@ -158,6 +169,10 @@ func getRequestProtocol(req *http.Request) string {
|
||||||
return protoWebsocket
|
return protoWebsocket
|
||||||
case isSSERequest(req):
|
case isSSERequest(req):
|
||||||
return protoSSE
|
return protoSSE
|
||||||
|
case isGRPCWebRequest(req):
|
||||||
|
return protoGRPCWeb
|
||||||
|
case isGRPCRequest(req):
|
||||||
|
return protoGRPC
|
||||||
default:
|
default:
|
||||||
return protoHTTP
|
return protoHTTP
|
||||||
}
|
}
|
||||||
|
@ -173,6 +188,27 @@ func isSSERequest(req *http.Request) bool {
|
||||||
return containsHeader(req, "Accept", "text/event-stream")
|
return containsHeader(req, "Accept", "text/event-stream")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isGRPCWebRequest determines if the specified HTTP request is a gRPC-Web request.
|
||||||
|
func isGRPCWebRequest(req *http.Request) bool {
|
||||||
|
return strings.HasPrefix(req.Header.Get("Content-Type"), "application/grpc-web")
|
||||||
|
}
|
||||||
|
|
||||||
|
// isGRPCRequest determines if the specified HTTP request is a gRPC request.
|
||||||
|
func isGRPCRequest(req *http.Request) bool {
|
||||||
|
return strings.HasPrefix(req.Header.Get("Content-Type"), "application/grpc")
|
||||||
|
}
|
||||||
|
|
||||||
|
// grpcStatusCode parses and returns the gRPC status code from the Grpc-Status header.
|
||||||
|
func grpcStatusCode(rw http.ResponseWriter) int {
|
||||||
|
code := codes.Unknown
|
||||||
|
if status := rw.Header().Get("Grpc-Status"); status != "" {
|
||||||
|
if err := code.UnmarshalJSON([]byte(status)); err != nil {
|
||||||
|
return int(code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return int(code)
|
||||||
|
}
|
||||||
|
|
||||||
func containsHeader(req *http.Request, name, value string) bool {
|
func containsHeader(req *http.Request, name, value string) bool {
|
||||||
items := strings.Split(req.Header.Get(name), ",")
|
items := strings.Split(req.Header.Get(name), ",")
|
||||||
for _, item := range items {
|
for _, item := range items {
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
"github.com/go-kit/kit/metrics"
|
"github.com/go-kit/kit/metrics"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CollectingCounter is a metrics.Counter implementation that enables access to the CounterValue and LastLabelValues.
|
// CollectingCounter is a metrics.Counter implementation that enables access to the CounterValue and LastLabelValues.
|
||||||
|
@ -129,3 +130,102 @@ func Test_getMethod(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_getRequestProtocol(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
desc string
|
||||||
|
headers http.Header
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "default",
|
||||||
|
expected: protoHTTP,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "websocket",
|
||||||
|
headers: http.Header{
|
||||||
|
"Connection": []string{"upgrade"},
|
||||||
|
"Upgrade": []string{"websocket"},
|
||||||
|
},
|
||||||
|
expected: protoWebsocket,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "SSE",
|
||||||
|
headers: http.Header{
|
||||||
|
"Accept": []string{"text/event-stream"},
|
||||||
|
},
|
||||||
|
expected: protoSSE,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "grpc web",
|
||||||
|
headers: http.Header{
|
||||||
|
"Content-Type": []string{"application/grpc-web-text"},
|
||||||
|
},
|
||||||
|
expected: protoGRPCWeb,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "grpc",
|
||||||
|
headers: http.Header{
|
||||||
|
"Content-Type": []string{"application/grpc-text"},
|
||||||
|
},
|
||||||
|
expected: protoGRPC,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range testCases {
|
||||||
|
test := test
|
||||||
|
t.Run(test.desc, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "https://localhost", http.NoBody)
|
||||||
|
req.Header = test.headers
|
||||||
|
|
||||||
|
protocol := getRequestProtocol(req)
|
||||||
|
|
||||||
|
assert.Equal(t, test.expected, protocol)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_grpcStatusCode(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
desc string
|
||||||
|
status string
|
||||||
|
expected codes.Code
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "invalid number",
|
||||||
|
status: "foo",
|
||||||
|
expected: codes.Unknown,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "number",
|
||||||
|
status: "1",
|
||||||
|
expected: codes.Canceled,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "invalid string",
|
||||||
|
status: `"foo"`,
|
||||||
|
expected: codes.Unknown,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "string",
|
||||||
|
status: `"OK"`,
|
||||||
|
expected: codes.OK,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range testCases {
|
||||||
|
test := test
|
||||||
|
t.Run(test.desc, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
rw := httptest.NewRecorder()
|
||||||
|
rw.Header().Set("Grpc-Status", test.status)
|
||||||
|
|
||||||
|
code := grpcStatusCode(rw)
|
||||||
|
|
||||||
|
assert.EqualValues(t, test.expected, code)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue