traefik/vendor/github.com/openzipkin-contrib/zipkin-go-opentracing/propagation_ot.go

270 lines
6.8 KiB
Go
Raw Normal View History

2018-01-10 17:48:04 +01:00
package zipkintracer
import (
"encoding/binary"
2019-04-05 11:58:06 +02:00
"fmt"
2018-01-10 17:48:04 +01:00
"io"
"strconv"
"strings"
"github.com/gogo/protobuf/proto"
opentracing "github.com/opentracing/opentracing-go"
2019-04-05 11:58:06 +02:00
"github.com/openzipkin-contrib/zipkin-go-opentracing/flag"
"github.com/openzipkin-contrib/zipkin-go-opentracing/types"
"github.com/openzipkin-contrib/zipkin-go-opentracing/wire"
2018-01-10 17:48:04 +01:00
)
type textMapPropagator struct {
tracer *tracerImpl
}
type binaryPropagator struct {
tracer *tracerImpl
}
const (
prefixTracerState = "x-b3-" // we default to interop with non-opentracing zipkin tracers
prefixBaggage = "ot-baggage-"
2019-04-05 11:58:06 +02:00
zipkinTraceID = prefixTracerState + "traceid"
zipkinSpanID = prefixTracerState + "spanid"
zipkinParentSpanID = prefixTracerState + "parentspanid"
zipkinSampled = prefixTracerState + "sampled"
zipkinFlags = prefixTracerState + "flags"
2018-01-10 17:48:04 +01:00
)
func (p *textMapPropagator) Inject(
spanContext opentracing.SpanContext,
opaqueCarrier interface{},
) error {
sc, ok := spanContext.(SpanContext)
if !ok {
return opentracing.ErrInvalidSpanContext
}
carrier, ok := opaqueCarrier.(opentracing.TextMapWriter)
if !ok {
return opentracing.ErrInvalidCarrier
}
2019-04-05 11:58:06 +02:00
// only inject IDs if both trace ID and span ID are present
if !sc.TraceID.Empty() && sc.SpanID > 0 {
carrier.Set(zipkinTraceID, sc.TraceID.ToHex())
carrier.Set(zipkinSpanID, fmt.Sprintf("%016x", sc.SpanID))
if sc.ParentSpanID != nil {
// we only set ParentSpanID header if there is a parent span
carrier.Set(zipkinParentSpanID, fmt.Sprintf("%016x", *sc.ParentSpanID))
}
}
if sc.Sampled {
carrier.Set(zipkinSampled, "1")
} else {
carrier.Set(zipkinSampled, "0")
2018-01-10 17:48:04 +01:00
}
2019-04-05 11:58:06 +02:00
2018-01-10 17:48:04 +01:00
// we only need to inject the debug flag if set. see flag package for details.
flags := sc.Flags & flag.Debug
carrier.Set(zipkinFlags, strconv.FormatUint(uint64(flags), 10))
for k, v := range sc.Baggage {
carrier.Set(prefixBaggage+k, v)
}
return nil
}
func (p *textMapPropagator) Extract(
opaqueCarrier interface{},
) (opentracing.SpanContext, error) {
carrier, ok := opaqueCarrier.(opentracing.TextMapReader)
if !ok {
return nil, opentracing.ErrInvalidCarrier
}
requiredFieldCount := 0
var (
traceID types.TraceID
spanID uint64
sampled bool
parentSpanID *uint64
flags flag.Flags
err error
)
decodedBaggage := make(map[string]string)
2019-04-05 11:58:06 +02:00
var traceIDFound, spanIDFound bool
2018-01-10 17:48:04 +01:00
err = carrier.ForeachKey(func(k, v string) error {
switch strings.ToLower(k) {
case zipkinTraceID:
traceID, err = types.TraceIDFromHex(v)
if err != nil {
return opentracing.ErrSpanContextCorrupted
}
2019-04-05 11:58:06 +02:00
// mark TraceID as found
if !traceIDFound {
requiredFieldCount++
traceIDFound = true
}
2018-01-10 17:48:04 +01:00
case zipkinSpanID:
spanID, err = strconv.ParseUint(v, 16, 64)
if err != nil {
return opentracing.ErrSpanContextCorrupted
}
2019-04-05 11:58:06 +02:00
// mark SpanID as found
if !spanIDFound {
requiredFieldCount++
spanIDFound = true
}
2018-01-10 17:48:04 +01:00
case zipkinParentSpanID:
var id uint64
id, err = strconv.ParseUint(v, 16, 64)
if err != nil {
return opentracing.ErrSpanContextCorrupted
}
parentSpanID = &id
case zipkinSampled:
sampled, err = strconv.ParseBool(v)
if err != nil {
return opentracing.ErrSpanContextCorrupted
}
// Sampled header was explicitly set
flags |= flag.SamplingSet
case zipkinFlags:
var f uint64
f, err = strconv.ParseUint(v, 10, 64)
if err != nil {
return opentracing.ErrSpanContextCorrupted
}
if flag.Flags(f)&flag.Debug == flag.Debug {
flags |= flag.Debug
}
default:
lowercaseK := strings.ToLower(k)
if strings.HasPrefix(lowercaseK, prefixBaggage) {
decodedBaggage[strings.TrimPrefix(lowercaseK, prefixBaggage)] = v
}
}
return nil
})
if err != nil {
return nil, err
}
2019-04-05 11:58:06 +02:00
// required fields must both be present or neither be present
if requiredFieldCount != 0 && requiredFieldCount != 2 {
2018-01-10 17:48:04 +01:00
return nil, opentracing.ErrSpanContextCorrupted
}
// check if Sample state was communicated through the Flags bitset
if !sampled && flags&flag.Sampled == flag.Sampled {
sampled = true
}
return SpanContext{
TraceID: traceID,
SpanID: spanID,
Sampled: sampled,
Baggage: decodedBaggage,
ParentSpanID: parentSpanID,
Flags: flags,
}, nil
}
func (p *binaryPropagator) Inject(
spanContext opentracing.SpanContext,
opaqueCarrier interface{},
) error {
sc, ok := spanContext.(SpanContext)
if !ok {
return opentracing.ErrInvalidSpanContext
}
carrier, ok := opaqueCarrier.(io.Writer)
if !ok {
return opentracing.ErrInvalidCarrier
}
state := wire.TracerState{}
state.TraceId = sc.TraceID.Low
state.TraceIdHigh = sc.TraceID.High
state.SpanId = sc.SpanID
state.Sampled = sc.Sampled
state.BaggageItems = sc.Baggage
// encode the debug bit
flags := sc.Flags & flag.Debug
if sc.ParentSpanID != nil {
state.ParentSpanId = *sc.ParentSpanID
} else {
// root span...
state.ParentSpanId = 0
flags |= flag.IsRoot
}
// we explicitly inform our sampling state downstream
flags |= flag.SamplingSet
if sc.Sampled {
flags |= flag.Sampled
}
state.Flags = uint64(flags)
b, err := proto.Marshal(&state)
if err != nil {
return err
}
// Write the length of the marshalled binary to the writer.
length := uint32(len(b))
if err = binary.Write(carrier, binary.BigEndian, &length); err != nil {
return err
}
_, err = carrier.Write(b)
return err
}
func (p *binaryPropagator) Extract(
opaqueCarrier interface{},
) (opentracing.SpanContext, error) {
carrier, ok := opaqueCarrier.(io.Reader)
if !ok {
return nil, opentracing.ErrInvalidCarrier
}
// Read the length of marshalled binary. io.ReadAll isn't that performant
// since it keeps resizing the underlying buffer as it encounters more bytes
// to read. By reading the length, we can allocate a fixed sized buf and read
// the exact amount of bytes into it.
var length uint32
if err := binary.Read(carrier, binary.BigEndian, &length); err != nil {
return nil, opentracing.ErrSpanContextCorrupted
}
buf := make([]byte, length)
if n, err := carrier.Read(buf); err != nil {
if n > 0 {
return nil, opentracing.ErrSpanContextCorrupted
}
return nil, opentracing.ErrSpanContextNotFound
}
ctx := wire.TracerState{}
if err := proto.Unmarshal(buf, &ctx); err != nil {
return nil, opentracing.ErrSpanContextCorrupted
}
flags := flag.Flags(ctx.Flags)
if flags&flag.Sampled == flag.Sampled {
ctx.Sampled = true
}
// this propagator expects sampling state to be explicitly propagated by the
// upstream service. so set this flag to indentify to tracer it should not
// run its sampler in case it is not the root of the trace.
flags |= flag.SamplingSet
return SpanContext{
TraceID: types.TraceID{Low: ctx.TraceId, High: ctx.TraceIdHigh},
SpanID: ctx.SpanId,
Sampled: ctx.Sampled,
Baggage: ctx.BaggageItems,
ParentSpanID: &ctx.ParentSpanId,
Flags: flags,
}, nil
}