178 lines
4.8 KiB
Go
178 lines
4.8 KiB
Go
|
// Copyright (c) 2017 Uber Technologies, Inc.
|
||
|
//
|
||
|
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
||
|
// of this software and associated documentation files (the "Software"), to deal
|
||
|
// in the Software without restriction, including without limitation the rights
|
||
|
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||
|
// copies of the Software, and to permit persons to whom the Software is
|
||
|
// furnished to do so, subject to the following conditions:
|
||
|
//
|
||
|
// The above copyright notice and this permission notice shall be included in
|
||
|
// all copies or substantial portions of the Software.
|
||
|
//
|
||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||
|
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||
|
// THE SOFTWARE.
|
||
|
|
||
|
package rpcmetrics
|
||
|
|
||
|
import (
|
||
|
"strconv"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/opentracing/opentracing-go"
|
||
|
"github.com/opentracing/opentracing-go/ext"
|
||
|
"github.com/uber/jaeger-lib/metrics"
|
||
|
|
||
|
jaeger "github.com/uber/jaeger-client-go"
|
||
|
)
|
||
|
|
||
|
const defaultMaxNumberOfEndpoints = 200
|
||
|
|
||
|
// Observer is an observer that can emit RPC metrics.
|
||
|
type Observer struct {
|
||
|
metricsByEndpoint *MetricsByEndpoint
|
||
|
}
|
||
|
|
||
|
// NewObserver creates a new observer that can emit RPC metrics.
|
||
|
func NewObserver(metricsFactory metrics.Factory, normalizer NameNormalizer) *Observer {
|
||
|
return &Observer{
|
||
|
metricsByEndpoint: newMetricsByEndpoint(
|
||
|
metricsFactory,
|
||
|
normalizer,
|
||
|
defaultMaxNumberOfEndpoints,
|
||
|
),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// OnStartSpan creates a new Observer for the span.
|
||
|
func (o *Observer) OnStartSpan(
|
||
|
operationName string,
|
||
|
options opentracing.StartSpanOptions,
|
||
|
) jaeger.SpanObserver {
|
||
|
return NewSpanObserver(o.metricsByEndpoint, operationName, options)
|
||
|
}
|
||
|
|
||
|
// SpanKind identifies the span as inboud, outbound, or internal
|
||
|
type SpanKind int
|
||
|
|
||
|
const (
|
||
|
// Local span kind
|
||
|
Local SpanKind = iota
|
||
|
// Inbound span kind
|
||
|
Inbound
|
||
|
// Outbound span kind
|
||
|
Outbound
|
||
|
)
|
||
|
|
||
|
// SpanObserver collects RPC metrics
|
||
|
type SpanObserver struct {
|
||
|
metricsByEndpoint *MetricsByEndpoint
|
||
|
operationName string
|
||
|
startTime time.Time
|
||
|
mux sync.Mutex
|
||
|
kind SpanKind
|
||
|
httpStatusCode uint16
|
||
|
err bool
|
||
|
}
|
||
|
|
||
|
// NewSpanObserver creates a new SpanObserver that can emit RPC metrics.
|
||
|
func NewSpanObserver(
|
||
|
metricsByEndpoint *MetricsByEndpoint,
|
||
|
operationName string,
|
||
|
options opentracing.StartSpanOptions,
|
||
|
) *SpanObserver {
|
||
|
so := &SpanObserver{
|
||
|
metricsByEndpoint: metricsByEndpoint,
|
||
|
operationName: operationName,
|
||
|
startTime: options.StartTime,
|
||
|
}
|
||
|
for k, v := range options.Tags {
|
||
|
so.handleTagInLock(k, v)
|
||
|
}
|
||
|
return so
|
||
|
}
|
||
|
|
||
|
// handleTags watches for special tags
|
||
|
// - SpanKind
|
||
|
// - HttpStatusCode
|
||
|
// - Error
|
||
|
func (so *SpanObserver) handleTagInLock(key string, value interface{}) {
|
||
|
if key == string(ext.SpanKind) {
|
||
|
if v, ok := value.(ext.SpanKindEnum); ok {
|
||
|
value = string(v)
|
||
|
}
|
||
|
if v, ok := value.(string); ok {
|
||
|
if v == string(ext.SpanKindRPCClientEnum) {
|
||
|
so.kind = Outbound
|
||
|
} else if v == string(ext.SpanKindRPCServerEnum) {
|
||
|
so.kind = Inbound
|
||
|
}
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
if key == string(ext.HTTPStatusCode) {
|
||
|
if v, ok := value.(uint16); ok {
|
||
|
so.httpStatusCode = v
|
||
|
} else if v, ok := value.(int); ok {
|
||
|
so.httpStatusCode = uint16(v)
|
||
|
} else if v, ok := value.(string); ok {
|
||
|
if vv, err := strconv.Atoi(v); err == nil {
|
||
|
so.httpStatusCode = uint16(vv)
|
||
|
}
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
if key == string(ext.Error) {
|
||
|
if v, ok := value.(bool); ok {
|
||
|
so.err = v
|
||
|
} else if v, ok := value.(string); ok {
|
||
|
if vv, err := strconv.ParseBool(v); err == nil {
|
||
|
so.err = vv
|
||
|
}
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// OnFinish emits the RPC metrics. It only has an effect when operation name
|
||
|
// is not blank, and the span kind is an RPC server.
|
||
|
func (so *SpanObserver) OnFinish(options opentracing.FinishOptions) {
|
||
|
so.mux.Lock()
|
||
|
defer so.mux.Unlock()
|
||
|
|
||
|
if so.operationName == "" || so.kind != Inbound {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
mets := so.metricsByEndpoint.get(so.operationName)
|
||
|
latency := options.FinishTime.Sub(so.startTime)
|
||
|
if so.err {
|
||
|
mets.RequestCountFailures.Inc(1)
|
||
|
mets.RequestLatencyFailures.Record(latency)
|
||
|
} else {
|
||
|
mets.RequestCountSuccess.Inc(1)
|
||
|
mets.RequestLatencySuccess.Record(latency)
|
||
|
}
|
||
|
mets.recordHTTPStatusCode(so.httpStatusCode)
|
||
|
}
|
||
|
|
||
|
// OnSetOperationName records new operation name.
|
||
|
func (so *SpanObserver) OnSetOperationName(operationName string) {
|
||
|
so.mux.Lock()
|
||
|
so.operationName = operationName
|
||
|
so.mux.Unlock()
|
||
|
}
|
||
|
|
||
|
// OnSetTag implements SpanObserver
|
||
|
func (so *SpanObserver) OnSetTag(key string, value interface{}) {
|
||
|
so.mux.Lock()
|
||
|
so.handleTagInLock(key, value)
|
||
|
so.mux.Unlock()
|
||
|
}
|