traefik/vendor/github.com/uber/jaeger-client-go/rpcmetrics/observer.go
2018-01-10 17:48:04 +01:00

177 lines
4.8 KiB
Go

// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package rpcmetrics
import (
"strconv"
"sync"
"time"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/uber/jaeger-lib/metrics"
jaeger "github.com/uber/jaeger-client-go"
)
const defaultMaxNumberOfEndpoints = 200
// Observer is an observer that can emit RPC metrics.
type Observer struct {
metricsByEndpoint *MetricsByEndpoint
}
// NewObserver creates a new observer that can emit RPC metrics.
func NewObserver(metricsFactory metrics.Factory, normalizer NameNormalizer) *Observer {
return &Observer{
metricsByEndpoint: newMetricsByEndpoint(
metricsFactory,
normalizer,
defaultMaxNumberOfEndpoints,
),
}
}
// OnStartSpan creates a new Observer for the span.
func (o *Observer) OnStartSpan(
operationName string,
options opentracing.StartSpanOptions,
) jaeger.SpanObserver {
return NewSpanObserver(o.metricsByEndpoint, operationName, options)
}
// SpanKind identifies the span as inboud, outbound, or internal
type SpanKind int
const (
// Local span kind
Local SpanKind = iota
// Inbound span kind
Inbound
// Outbound span kind
Outbound
)
// SpanObserver collects RPC metrics
type SpanObserver struct {
metricsByEndpoint *MetricsByEndpoint
operationName string
startTime time.Time
mux sync.Mutex
kind SpanKind
httpStatusCode uint16
err bool
}
// NewSpanObserver creates a new SpanObserver that can emit RPC metrics.
func NewSpanObserver(
metricsByEndpoint *MetricsByEndpoint,
operationName string,
options opentracing.StartSpanOptions,
) *SpanObserver {
so := &SpanObserver{
metricsByEndpoint: metricsByEndpoint,
operationName: operationName,
startTime: options.StartTime,
}
for k, v := range options.Tags {
so.handleTagInLock(k, v)
}
return so
}
// handleTags watches for special tags
// - SpanKind
// - HttpStatusCode
// - Error
func (so *SpanObserver) handleTagInLock(key string, value interface{}) {
if key == string(ext.SpanKind) {
if v, ok := value.(ext.SpanKindEnum); ok {
value = string(v)
}
if v, ok := value.(string); ok {
if v == string(ext.SpanKindRPCClientEnum) {
so.kind = Outbound
} else if v == string(ext.SpanKindRPCServerEnum) {
so.kind = Inbound
}
}
return
}
if key == string(ext.HTTPStatusCode) {
if v, ok := value.(uint16); ok {
so.httpStatusCode = v
} else if v, ok := value.(int); ok {
so.httpStatusCode = uint16(v)
} else if v, ok := value.(string); ok {
if vv, err := strconv.Atoi(v); err == nil {
so.httpStatusCode = uint16(vv)
}
}
return
}
if key == string(ext.Error) {
if v, ok := value.(bool); ok {
so.err = v
} else if v, ok := value.(string); ok {
if vv, err := strconv.ParseBool(v); err == nil {
so.err = vv
}
}
return
}
}
// OnFinish emits the RPC metrics. It only has an effect when operation name
// is not blank, and the span kind is an RPC server.
func (so *SpanObserver) OnFinish(options opentracing.FinishOptions) {
so.mux.Lock()
defer so.mux.Unlock()
if so.operationName == "" || so.kind != Inbound {
return
}
mets := so.metricsByEndpoint.get(so.operationName)
latency := options.FinishTime.Sub(so.startTime)
if so.err {
mets.RequestCountFailures.Inc(1)
mets.RequestLatencyFailures.Record(latency)
} else {
mets.RequestCountSuccess.Inc(1)
mets.RequestLatencySuccess.Record(latency)
}
mets.recordHTTPStatusCode(so.httpStatusCode)
}
// OnSetOperationName records new operation name.
func (so *SpanObserver) OnSetOperationName(operationName string) {
so.mux.Lock()
so.operationName = operationName
so.mux.Unlock()
}
// OnSetTag implements SpanObserver
func (so *SpanObserver) OnSetTag(key string, value interface{}) {
so.mux.Lock()
so.handleTagInLock(key, value)
so.mux.Unlock()
}