Add Tracing Header Context Name option for Jaeger
This commit is contained in:
parent
f0ee2890b2
commit
156f6b8d3c
17 changed files with 327 additions and 49 deletions
8
Gopkg.lock
generated
8
Gopkg.lock
generated
|
@ -1592,7 +1592,7 @@
|
|||
revision = "03389da7e0bf9844767f82690f4d68fc097a1306"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:d69d06aa6732b9df6ab33998db450ffe0cae9f98e7d019ef3bebac8fd8cd0138"
|
||||
digest = "1:7d3a890e525da3b7014d26dd1d4a0e4d31a479995007cd11989ad31db132e66c"
|
||||
name = "github.com/uber/jaeger-client-go"
|
||||
packages = [
|
||||
".",
|
||||
|
@ -1610,12 +1610,13 @@
|
|||
"thrift-gen/jaeger",
|
||||
"thrift-gen/sampling",
|
||||
"thrift-gen/zipkincore",
|
||||
"transport",
|
||||
"utils",
|
||||
"zipkin",
|
||||
]
|
||||
pruneopts = "NUT"
|
||||
revision = "b043381d944715b469fd6b37addfd30145ca1758"
|
||||
version = "v2.14.0"
|
||||
revision = "1a782e2da844727691fef1757c72eb190c2909f0"
|
||||
version = "v2.15.0"
|
||||
|
||||
[[projects]]
|
||||
digest = "1:0f09db8429e19d57c8346ad76fbbc679341fa86073d3b8fb5ac919f0357d8f4c"
|
||||
|
@ -2350,6 +2351,7 @@
|
|||
"github.com/stretchr/testify/require",
|
||||
"github.com/stvp/go-udp-testing",
|
||||
"github.com/thoas/stats",
|
||||
"github.com/uber/jaeger-client-go",
|
||||
"github.com/uber/jaeger-client-go/config",
|
||||
"github.com/uber/jaeger-client-go/zipkin",
|
||||
"github.com/uber/jaeger-lib/metrics",
|
||||
|
|
|
@ -159,7 +159,7 @@
|
|||
|
||||
[[constraint]]
|
||||
name = "github.com/uber/jaeger-client-go"
|
||||
version = "2.14.0"
|
||||
version = "2.15.0"
|
||||
|
||||
[[constraint]]
|
||||
name = "github.com/uber/jaeger-lib"
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"github.com/containous/traefik/tracing/jaeger"
|
||||
"github.com/containous/traefik/tracing/zipkin"
|
||||
"github.com/containous/traefik/types"
|
||||
jaegercli "github.com/uber/jaeger-client-go"
|
||||
)
|
||||
|
||||
// TraefikConfiguration holds GlobalConfiguration and other stuff
|
||||
|
@ -98,6 +99,7 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
|
|||
LocalAgentHostPort: "127.0.0.1:6831",
|
||||
Propagation: "jaeger",
|
||||
Gen128Bit: false,
|
||||
TraceContextHeaderName: jaegercli.TraceContextHeaderName,
|
||||
},
|
||||
Zipkin: &zipkin.Config{
|
||||
HTTPEndpoint: "http://localhost:9411/api/v1/spans",
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"github.com/containous/traefik/tracing/zipkin"
|
||||
"github.com/containous/traefik/types"
|
||||
"github.com/elazarl/go-bindata-assetfs"
|
||||
jaegercli "github.com/uber/jaeger-client-go"
|
||||
"github.com/xenolf/lego/challenge/dns01"
|
||||
)
|
||||
|
||||
|
@ -232,6 +233,7 @@ func (c *Configuration) initTracing() {
|
|||
LocalAgentHostPort: "127.0.0.1:6831",
|
||||
Propagation: "jaeger",
|
||||
Gen128Bit: false,
|
||||
TraceContextHeaderName: jaegercli.TraceContextHeaderName,
|
||||
}
|
||||
}
|
||||
if c.Tracing.Zipkin != nil {
|
||||
|
|
|
@ -70,6 +70,13 @@ Traefik supports three tracing backends: Jaeger, Zipkin and DataDog.
|
|||
#
|
||||
# Default: "jaeger"
|
||||
propagation = "jaeger"
|
||||
|
||||
# Trace Context Header Name is the http header name used to propagate tracing context.
|
||||
# This must be in lower-case to avoid mismatches when decoding incoming headers.
|
||||
#
|
||||
# Default: "uber-trace-id"
|
||||
#
|
||||
traceContextHeaderName = "uber-trace-id"
|
||||
```
|
||||
|
||||
!!! warning
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"github.com/containous/traefik/log"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
jaeger "github.com/uber/jaeger-client-go"
|
||||
jaegercfg "github.com/uber/jaeger-client-go/config"
|
||||
"github.com/uber/jaeger-client-go/zipkin"
|
||||
jaegermet "github.com/uber/jaeger-lib/metrics"
|
||||
|
@ -22,6 +23,7 @@ type Config struct {
|
|||
LocalAgentHostPort string `description:"set jaeger-agent's host:port that the reporter will used." export:"false"`
|
||||
Gen128Bit bool `description:"generate 128 bit span IDs." export:"true"`
|
||||
Propagation string `description:"which propgation format to use (jaeger/b3)." export:"true"`
|
||||
TraceContextHeaderName string `description:"set the header to use for the trace-id." export:"true"`
|
||||
}
|
||||
|
||||
// Setup sets up the tracer
|
||||
|
@ -36,6 +38,9 @@ func (c *Config) Setup(componentName string) (opentracing.Tracer, io.Closer, err
|
|||
LogSpans: true,
|
||||
LocalAgentHostPort: c.LocalAgentHostPort,
|
||||
},
|
||||
Headers: &jaeger.HeadersConfig{
|
||||
TraceContextHeaderName: c.TraceContextHeaderName,
|
||||
},
|
||||
}
|
||||
|
||||
jMetricsFactory := jaegermet.NullFactory
|
||||
|
|
24
vendor/github.com/uber/jaeger-client-go/config/config.go
generated
vendored
24
vendor/github.com/uber/jaeger-client-go/config/config.go
generated
vendored
|
@ -27,6 +27,7 @@ import (
|
|||
"github.com/uber/jaeger-client-go/internal/baggage/remote"
|
||||
throttler "github.com/uber/jaeger-client-go/internal/throttler/remote"
|
||||
"github.com/uber/jaeger-client-go/rpcmetrics"
|
||||
"github.com/uber/jaeger-client-go/transport"
|
||||
)
|
||||
|
||||
const defaultSamplingProbability = 0.001
|
||||
|
@ -108,6 +109,18 @@ type ReporterConfig struct {
|
|||
// LocalAgentHostPort instructs reporter to send spans to jaeger-agent at this address
|
||||
// Can be set by exporting an environment variable named JAEGER_AGENT_HOST / JAEGER_AGENT_PORT
|
||||
LocalAgentHostPort string `yaml:"localAgentHostPort"`
|
||||
|
||||
// CollectorEndpoint instructs reporter to send spans to jaeger-collector at this URL
|
||||
// Can be set by exporting an environment variable named JAEGER_ENDPOINT
|
||||
CollectorEndpoint string `yaml:"collectorEndpoint"`
|
||||
|
||||
// User instructs reporter to include a user for basic http authentication when sending spans to jaeger-collector.
|
||||
// Can be set by exporting an environment variable named JAEGER_USER
|
||||
User string `yaml:"user"`
|
||||
|
||||
// Password instructs reporter to include a password for basic http authentication when sending spans to
|
||||
// jaeger-collector. Can be set by exporting an environment variable named JAEGER_PASSWORD
|
||||
Password string `yaml:"password"`
|
||||
}
|
||||
|
||||
// BaggageRestrictionsConfig configures the baggage restrictions manager which can be used to whitelist
|
||||
|
@ -218,6 +231,7 @@ func (c Configuration) NewTracer(options ...Option) (opentracing.Tracer, io.Clos
|
|||
jaeger.TracerOptions.CustomHeaderKeys(c.Headers),
|
||||
jaeger.TracerOptions.Gen128Bit(opts.gen128Bit),
|
||||
jaeger.TracerOptions.ZipkinSharedRPCSpan(opts.zipkinSharedRPCSpan),
|
||||
jaeger.TracerOptions.MaxTagValueLength(opts.maxTagValueLength),
|
||||
}
|
||||
|
||||
for _, tag := range opts.tags {
|
||||
|
@ -344,7 +358,7 @@ func (sc *SamplerConfig) NewSampler(
|
|||
return nil, fmt.Errorf("Unknown sampler type %v", sc.Type)
|
||||
}
|
||||
|
||||
// NewReporter instantiates a new reporter that submits spans to tcollector
|
||||
// NewReporter instantiates a new reporter that submits spans to the collector
|
||||
func (rc *ReporterConfig) NewReporter(
|
||||
serviceName string,
|
||||
metrics *jaeger.Metrics,
|
||||
|
@ -368,5 +382,13 @@ func (rc *ReporterConfig) NewReporter(
|
|||
}
|
||||
|
||||
func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) {
|
||||
switch {
|
||||
case rc.CollectorEndpoint != "" && rc.User != "" && rc.Password != "":
|
||||
return transport.NewHTTPTransport(rc.CollectorEndpoint, transport.HTTPBatchSize(1),
|
||||
transport.HTTPBasicAuth(rc.User, rc.Password)), nil
|
||||
case rc.CollectorEndpoint != "":
|
||||
return transport.NewHTTPTransport(rc.CollectorEndpoint, transport.HTTPBatchSize(1)), nil
|
||||
default:
|
||||
return jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0)
|
||||
}
|
||||
}
|
||||
|
|
27
vendor/github.com/uber/jaeger-client-go/config/config_env.go
generated
vendored
27
vendor/github.com/uber/jaeger-client-go/config/config_env.go
generated
vendored
|
@ -16,6 +16,7 @@ package config
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -41,6 +42,9 @@ const (
|
|||
envReporterMaxQueueSize = "JAEGER_REPORTER_MAX_QUEUE_SIZE"
|
||||
envReporterFlushInterval = "JAEGER_REPORTER_FLUSH_INTERVAL"
|
||||
envReporterLogSpans = "JAEGER_REPORTER_LOG_SPANS"
|
||||
envEndpoint = "JAEGER_ENDPOINT"
|
||||
envUser = "JAEGER_USER"
|
||||
envPassword = "JAEGER_PASSWORD"
|
||||
envAgentHost = "JAEGER_AGENT_HOST"
|
||||
envAgentPort = "JAEGER_AGENT_PORT"
|
||||
)
|
||||
|
@ -156,12 +160,19 @@ func reporterConfigFromEnv() (*ReporterConfig, error) {
|
|||
}
|
||||
|
||||
host := jaeger.DefaultUDPSpanServerHost
|
||||
ep := os.Getenv(envEndpoint)
|
||||
if e := os.Getenv(envAgentHost); e != "" {
|
||||
if ep != "" {
|
||||
return nil, errors.Errorf("cannot set env vars %s and %s together", envAgentHost, envEndpoint)
|
||||
}
|
||||
host = e
|
||||
}
|
||||
|
||||
port := jaeger.DefaultUDPSpanServerPort
|
||||
if e := os.Getenv(envAgentPort); e != "" {
|
||||
if ep != "" {
|
||||
return nil, errors.Errorf("cannot set env vars %s and %s together", envAgentPort, envEndpoint)
|
||||
}
|
||||
if value, err := strconv.ParseInt(e, 10, 0); err == nil {
|
||||
port = int(value)
|
||||
} else {
|
||||
|
@ -173,6 +184,22 @@ func reporterConfigFromEnv() (*ReporterConfig, error) {
|
|||
// were not explicitly passed
|
||||
rc.LocalAgentHostPort = fmt.Sprintf("%s:%d", host, port)
|
||||
|
||||
if ep != "" {
|
||||
u, err := url.ParseRequestURI(ep)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "cannot parse env var %s=%s", envEndpoint, ep)
|
||||
}
|
||||
rc.CollectorEndpoint = fmt.Sprintf("%s", u)
|
||||
}
|
||||
|
||||
user := os.Getenv(envUser)
|
||||
pswd := os.Getenv(envPassword)
|
||||
if user != "" && pswd == "" || user == "" && pswd != "" {
|
||||
return nil, errors.Errorf("you must set %s and %s env vars together", envUser, envPassword)
|
||||
}
|
||||
rc.User = user
|
||||
rc.Password = pswd
|
||||
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
|
|
8
vendor/github.com/uber/jaeger-client-go/config/options.go
generated
vendored
8
vendor/github.com/uber/jaeger-client-go/config/options.go
generated
vendored
|
@ -34,6 +34,7 @@ type Options struct {
|
|||
observers []jaeger.Observer
|
||||
gen128Bit bool
|
||||
zipkinSharedRPCSpan bool
|
||||
maxTagValueLength int
|
||||
tags []opentracing.Tag
|
||||
injectors map[interface{}]jaeger.Injector
|
||||
extractors map[interface{}]jaeger.Extractor
|
||||
|
@ -101,6 +102,13 @@ func ZipkinSharedRPCSpan(zipkinSharedRPCSpan bool) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// MaxTagValueLength can be provided to override the default max tag value length.
|
||||
func MaxTagValueLength(maxTagValueLength int) Option {
|
||||
return func(c *Options) {
|
||||
c.maxTagValueLength = maxTagValueLength
|
||||
}
|
||||
}
|
||||
|
||||
// Tag creates an option that adds a tracer-level tag.
|
||||
func Tag(key string, value interface{}) Option {
|
||||
return func(c *Options) {
|
||||
|
|
5
vendor/github.com/uber/jaeger-client-go/constants.go
generated
vendored
5
vendor/github.com/uber/jaeger-client-go/constants.go
generated
vendored
|
@ -16,7 +16,7 @@ package jaeger
|
|||
|
||||
const (
|
||||
// JaegerClientVersion is the version of the client library reported as Span tag.
|
||||
JaegerClientVersion = "Go-2.14.0"
|
||||
JaegerClientVersion = "Go-2.15.0"
|
||||
|
||||
// JaegerClientVersionTagKey is the name of the tag used to report client version.
|
||||
JaegerClientVersionTagKey = "jaeger.version"
|
||||
|
@ -82,4 +82,7 @@ const (
|
|||
|
||||
// DefaultUDPSpanServerPort is the default port to send the spans to, via UDP
|
||||
DefaultUDPSpanServerPort = 6831
|
||||
|
||||
// DefaultMaxTagValueLength is the default max length of byte array or string allowed in the tag value.
|
||||
DefaultMaxTagValueLength = 256
|
||||
)
|
||||
|
|
18
vendor/github.com/uber/jaeger-client-go/jaeger_thrift_span.go
generated
vendored
18
vendor/github.com/uber/jaeger-client-go/jaeger_thrift_span.go
generated
vendored
|
@ -38,7 +38,7 @@ func BuildJaegerThrift(span *Span) *j.Span {
|
|||
Flags: int32(span.context.flags),
|
||||
StartTime: startTime,
|
||||
Duration: duration,
|
||||
Tags: buildTags(span.tags),
|
||||
Tags: buildTags(span.tags, span.tracer.options.maxTagValueLength),
|
||||
Logs: buildLogs(span.logs),
|
||||
References: buildReferences(span.references),
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ func BuildJaegerProcessThrift(span *Span) *j.Process {
|
|||
func buildJaegerProcessThrift(tracer *Tracer) *j.Process {
|
||||
process := &j.Process{
|
||||
ServiceName: tracer.serviceName,
|
||||
Tags: buildTags(tracer.tags),
|
||||
Tags: buildTags(tracer.tags, tracer.options.maxTagValueLength),
|
||||
}
|
||||
if tracer.process.UUID != "" {
|
||||
process.Tags = append(process.Tags, &j.Tag{Key: TracerUUIDTagKey, VStr: &tracer.process.UUID, VType: j.TagType_STRING})
|
||||
|
@ -63,10 +63,10 @@ func buildJaegerProcessThrift(tracer *Tracer) *j.Process {
|
|||
return process
|
||||
}
|
||||
|
||||
func buildTags(tags []Tag) []*j.Tag {
|
||||
func buildTags(tags []Tag, maxTagValueLength int) []*j.Tag {
|
||||
jTags := make([]*j.Tag, 0, len(tags))
|
||||
for _, tag := range tags {
|
||||
jTag := buildTag(&tag)
|
||||
jTag := buildTag(&tag, maxTagValueLength)
|
||||
jTags = append(jTags, jTag)
|
||||
}
|
||||
return jTags
|
||||
|
@ -84,16 +84,16 @@ func buildLogs(logs []opentracing.LogRecord) []*j.Log {
|
|||
return jLogs
|
||||
}
|
||||
|
||||
func buildTag(tag *Tag) *j.Tag {
|
||||
func buildTag(tag *Tag, maxTagValueLength int) *j.Tag {
|
||||
jTag := &j.Tag{Key: tag.key}
|
||||
switch value := tag.value.(type) {
|
||||
case string:
|
||||
vStr := truncateString(value)
|
||||
vStr := truncateString(value, maxTagValueLength)
|
||||
jTag.VStr = &vStr
|
||||
jTag.VType = j.TagType_STRING
|
||||
case []byte:
|
||||
if len(value) > maxAnnotationLength {
|
||||
value = value[:maxAnnotationLength]
|
||||
if len(value) > maxTagValueLength {
|
||||
value = value[:maxTagValueLength]
|
||||
}
|
||||
jTag.VBinary = value
|
||||
jTag.VType = j.TagType_BINARY
|
||||
|
@ -150,7 +150,7 @@ func buildTag(tag *Tag) *j.Tag {
|
|||
jTag.VBool = &vBool
|
||||
jTag.VType = j.TagType_BOOL
|
||||
default:
|
||||
vStr := truncateString(stringify(value))
|
||||
vStr := truncateString(stringify(value), maxTagValueLength)
|
||||
jTag.VStr = &vStr
|
||||
jTag.VType = j.TagType_STRING
|
||||
}
|
||||
|
|
1
vendor/github.com/uber/jaeger-client-go/sampler.go
generated
vendored
1
vendor/github.com/uber/jaeger-client-go/sampler.go
generated
vendored
|
@ -511,6 +511,7 @@ func (s *RemotelyControlledSampler) updateSampler() {
|
|||
res, err := s.manager.GetSamplingStrategy(s.serviceName)
|
||||
if err != nil {
|
||||
s.metrics.SamplerQueryFailure.Inc(1)
|
||||
s.logger.Infof("Unable to query sampling strategy: %v", err)
|
||||
return
|
||||
}
|
||||
s.Lock()
|
||||
|
|
14
vendor/github.com/uber/jaeger-client-go/tracer.go
generated
vendored
14
vendor/github.com/uber/jaeger-client-go/tracer.go
generated
vendored
|
@ -50,6 +50,7 @@ type Tracer struct {
|
|||
gen128Bit bool // whether to generate 128bit trace IDs
|
||||
zipkinSharedRPCSpan bool
|
||||
highTraceIDGenerator func() uint64 // custom high trace ID generator
|
||||
maxTagValueLength int
|
||||
// more options to come
|
||||
}
|
||||
// pool for Span objects
|
||||
|
@ -152,6 +153,9 @@ func NewTracer(
|
|||
t.logger.Error("Overriding high trace ID generator but not generating " +
|
||||
"128 bit trace IDs, consider enabling the \"Gen128Bit\" option")
|
||||
}
|
||||
if t.options.maxTagValueLength == 0 {
|
||||
t.options.maxTagValueLength = DefaultMaxTagValueLength
|
||||
}
|
||||
t.process = Process{
|
||||
Service: serviceName,
|
||||
UUID: strconv.FormatUint(t.randomNumber(), 16),
|
||||
|
@ -194,6 +198,12 @@ func (t *Tracer) startSpanWithOptions(
|
|||
options.StartTime = t.timeNow()
|
||||
}
|
||||
|
||||
// Predicate whether the given span context is a valid reference
|
||||
// which may be used as parent / debug ID / baggage items source
|
||||
isValidReference := func(ctx SpanContext) bool {
|
||||
return ctx.IsValid() || ctx.isDebugIDContainerOnly() || len(ctx.baggage) != 0
|
||||
}
|
||||
|
||||
var references []Reference
|
||||
var parent SpanContext
|
||||
var hasParent bool // need this because `parent` is a value, not reference
|
||||
|
@ -205,7 +215,7 @@ func (t *Tracer) startSpanWithOptions(
|
|||
reflect.ValueOf(ref.ReferencedContext)))
|
||||
continue
|
||||
}
|
||||
if !(ctx.IsValid() || ctx.isDebugIDContainerOnly() || len(ctx.baggage) != 0) {
|
||||
if !isValidReference(ctx) {
|
||||
continue
|
||||
}
|
||||
references = append(references, Reference{Type: ref.Type, Context: ctx})
|
||||
|
@ -214,7 +224,7 @@ func (t *Tracer) startSpanWithOptions(
|
|||
hasParent = ref.Type == opentracing.ChildOfRef
|
||||
}
|
||||
}
|
||||
if !hasParent && parent.IsValid() {
|
||||
if !hasParent && isValidReference(parent) {
|
||||
// If ChildOfRef wasn't found but a FollowFromRef exists, use the context from
|
||||
// the FollowFromRef as the parent
|
||||
hasParent = true
|
||||
|
|
6
vendor/github.com/uber/jaeger-client-go/tracer_options.go
generated
vendored
6
vendor/github.com/uber/jaeger-client-go/tracer_options.go
generated
vendored
|
@ -128,6 +128,12 @@ func (tracerOptions) HighTraceIDGenerator(highTraceIDGenerator func() uint64) Tr
|
|||
}
|
||||
}
|
||||
|
||||
func (tracerOptions) MaxTagValueLength(maxTagValueLength int) TracerOption {
|
||||
return func(tracer *Tracer) {
|
||||
tracer.options.maxTagValueLength = maxTagValueLength
|
||||
}
|
||||
}
|
||||
|
||||
func (tracerOptions) ZipkinSharedRPCSpan(zipkinSharedRPCSpan bool) TracerOption {
|
||||
return func(tracer *Tracer) {
|
||||
tracer.options.zipkinSharedRPCSpan = zipkinSharedRPCSpan
|
||||
|
|
23
vendor/github.com/uber/jaeger-client-go/transport/doc.go
generated
vendored
Normal file
23
vendor/github.com/uber/jaeger-client-go/transport/doc.go
generated
vendored
Normal file
|
@ -0,0 +1,23 @@
|
|||
// Copyright (c) 2017 Uber Technologies, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package transport defines various transports that can be used with
|
||||
// RemoteReporter to send spans out of process. Transport is responsible
|
||||
// for serializing the spans into a specific format suitable for sending
|
||||
// to the tracing backend. Examples may include Thrift over UDP, Thrift
|
||||
// or JSON over HTTP, Thrift over Kafka, etc.
|
||||
//
|
||||
// Implementations are NOT required to be thread-safe; the RemoteReporter
|
||||
// is expected to only call methods on the Transport from the same go-routine.
|
||||
package transport
|
163
vendor/github.com/uber/jaeger-client-go/transport/http.go
generated
vendored
Normal file
163
vendor/github.com/uber/jaeger-client-go/transport/http.go
generated
vendored
Normal file
|
@ -0,0 +1,163 @@
|
|||
// Copyright (c) 2017 Uber Technologies, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package transport
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/uber/jaeger-client-go/thrift"
|
||||
|
||||
"github.com/uber/jaeger-client-go"
|
||||
j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
|
||||
)
|
||||
|
||||
// Default timeout for http request in seconds
|
||||
const defaultHTTPTimeout = time.Second * 5
|
||||
|
||||
// HTTPTransport implements Transport by forwarding spans to a http server.
|
||||
type HTTPTransport struct {
|
||||
url string
|
||||
client *http.Client
|
||||
batchSize int
|
||||
spans []*j.Span
|
||||
process *j.Process
|
||||
httpCredentials *HTTPBasicAuthCredentials
|
||||
}
|
||||
|
||||
// HTTPBasicAuthCredentials stores credentials for HTTP basic auth.
|
||||
type HTTPBasicAuthCredentials struct {
|
||||
username string
|
||||
password string
|
||||
}
|
||||
|
||||
// HTTPOption sets a parameter for the HttpCollector
|
||||
type HTTPOption func(c *HTTPTransport)
|
||||
|
||||
// HTTPTimeout sets maximum timeout for http request.
|
||||
func HTTPTimeout(duration time.Duration) HTTPOption {
|
||||
return func(c *HTTPTransport) { c.client.Timeout = duration }
|
||||
}
|
||||
|
||||
// HTTPBatchSize sets the maximum batch size, after which a collect will be
|
||||
// triggered. The default batch size is 100 spans.
|
||||
func HTTPBatchSize(n int) HTTPOption {
|
||||
return func(c *HTTPTransport) { c.batchSize = n }
|
||||
}
|
||||
|
||||
// HTTPBasicAuth sets the credentials required to perform HTTP basic auth
|
||||
func HTTPBasicAuth(username string, password string) HTTPOption {
|
||||
return func(c *HTTPTransport) {
|
||||
c.httpCredentials = &HTTPBasicAuthCredentials{username: username, password: password}
|
||||
}
|
||||
}
|
||||
|
||||
// HTTPRoundTripper configures the underlying Transport on the *http.Client
|
||||
// that is used
|
||||
func HTTPRoundTripper(transport http.RoundTripper) HTTPOption {
|
||||
return func(c *HTTPTransport) {
|
||||
c.client.Transport = transport
|
||||
}
|
||||
}
|
||||
|
||||
// NewHTTPTransport returns a new HTTP-backend transport. url should be an http
|
||||
// url of the collector to handle POST request, typically something like:
|
||||
// http://hostname:14268/api/traces?format=jaeger.thrift
|
||||
func NewHTTPTransport(url string, options ...HTTPOption) *HTTPTransport {
|
||||
c := &HTTPTransport{
|
||||
url: url,
|
||||
client: &http.Client{Timeout: defaultHTTPTimeout},
|
||||
batchSize: 100,
|
||||
spans: []*j.Span{},
|
||||
}
|
||||
|
||||
for _, option := range options {
|
||||
option(c)
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// Append implements Transport.
|
||||
func (c *HTTPTransport) Append(span *jaeger.Span) (int, error) {
|
||||
if c.process == nil {
|
||||
c.process = jaeger.BuildJaegerProcessThrift(span)
|
||||
}
|
||||
jSpan := jaeger.BuildJaegerThrift(span)
|
||||
c.spans = append(c.spans, jSpan)
|
||||
if len(c.spans) >= c.batchSize {
|
||||
return c.Flush()
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Flush implements Transport.
|
||||
func (c *HTTPTransport) Flush() (int, error) {
|
||||
count := len(c.spans)
|
||||
if count == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
err := c.send(c.spans)
|
||||
c.spans = c.spans[:0]
|
||||
return count, err
|
||||
}
|
||||
|
||||
// Close implements Transport.
|
||||
func (c *HTTPTransport) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *HTTPTransport) send(spans []*j.Span) error {
|
||||
batch := &j.Batch{
|
||||
Spans: spans,
|
||||
Process: c.process,
|
||||
}
|
||||
body, err := serializeThrift(batch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req, err := http.NewRequest("POST", c.url, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/x-thrift")
|
||||
|
||||
if c.httpCredentials != nil {
|
||||
req.SetBasicAuth(c.httpCredentials.username, c.httpCredentials.password)
|
||||
}
|
||||
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
io.Copy(ioutil.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode >= http.StatusBadRequest {
|
||||
return fmt.Errorf("error from collector: %d", resp.StatusCode)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func serializeThrift(obj thrift.TStruct) (*bytes.Buffer, error) {
|
||||
t := thrift.NewTMemoryBuffer()
|
||||
p := thrift.NewTBinaryProtocolTransport(t)
|
||||
if err := obj.Write(p); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return t.Buffer, nil
|
||||
}
|
23
vendor/github.com/uber/jaeger-client-go/zipkin_thrift_span.go
generated
vendored
23
vendor/github.com/uber/jaeger-client-go/zipkin_thrift_span.go
generated
vendored
|
@ -27,9 +27,6 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// maxAnnotationLength is the max length of byte array or string allowed in the annotations
|
||||
maxAnnotationLength = 256
|
||||
|
||||
// Zipkin UI does not work well with non-string tag values
|
||||
allowPackedNumbers = false
|
||||
)
|
||||
|
@ -98,7 +95,7 @@ func buildAnnotations(span *zipkinSpan, endpoint *z.Endpoint) []*z.Annotation {
|
|||
Timestamp: utils.TimeToMicrosecondsSinceEpochInt64(log.Timestamp),
|
||||
Host: endpoint}
|
||||
if content, err := spanlog.MaterializeWithJSON(log.Fields); err == nil {
|
||||
anno.Value = truncateString(string(content))
|
||||
anno.Value = truncateString(string(content), span.tracer.options.maxTagValueLength)
|
||||
} else {
|
||||
anno.Value = err.Error()
|
||||
}
|
||||
|
@ -148,21 +145,21 @@ func buildBinaryAnnotations(span *zipkinSpan, endpoint *z.Endpoint) []*z.BinaryA
|
|||
if _, ok := specialTagHandlers[tag.key]; ok {
|
||||
continue
|
||||
}
|
||||
if anno := buildBinaryAnnotation(tag.key, tag.value, nil); anno != nil {
|
||||
if anno := buildBinaryAnnotation(tag.key, tag.value, span.tracer.options.maxTagValueLength, nil); anno != nil {
|
||||
annotations = append(annotations, anno)
|
||||
}
|
||||
}
|
||||
return annotations
|
||||
}
|
||||
|
||||
func buildBinaryAnnotation(key string, val interface{}, endpoint *z.Endpoint) *z.BinaryAnnotation {
|
||||
func buildBinaryAnnotation(key string, val interface{}, maxTagValueLength int, endpoint *z.Endpoint) *z.BinaryAnnotation {
|
||||
bann := &z.BinaryAnnotation{Key: key, Host: endpoint}
|
||||
if value, ok := val.(string); ok {
|
||||
bann.Value = []byte(truncateString(value))
|
||||
bann.Value = []byte(truncateString(value, maxTagValueLength))
|
||||
bann.AnnotationType = z.AnnotationType_STRING
|
||||
} else if value, ok := val.([]byte); ok {
|
||||
if len(value) > maxAnnotationLength {
|
||||
value = value[:maxAnnotationLength]
|
||||
if len(value) > maxTagValueLength {
|
||||
value = value[:maxTagValueLength]
|
||||
}
|
||||
bann.Value = value
|
||||
bann.AnnotationType = z.AnnotationType_BYTES
|
||||
|
@ -180,7 +177,7 @@ func buildBinaryAnnotation(key string, val interface{}, endpoint *z.Endpoint) *z
|
|||
bann.AnnotationType = z.AnnotationType_BOOL
|
||||
} else {
|
||||
value := stringify(val)
|
||||
bann.Value = []byte(truncateString(value))
|
||||
bann.Value = []byte(truncateString(value, maxTagValueLength))
|
||||
bann.AnnotationType = z.AnnotationType_STRING
|
||||
}
|
||||
return bann
|
||||
|
@ -193,12 +190,12 @@ func stringify(value interface{}) string {
|
|||
return fmt.Sprintf("%+v", value)
|
||||
}
|
||||
|
||||
func truncateString(value string) string {
|
||||
func truncateString(value string, maxLength int) string {
|
||||
// we ignore the problem of utf8 runes possibly being sliced in the middle,
|
||||
// as it is rather expensive to iterate through each tag just to find rune
|
||||
// boundaries.
|
||||
if len(value) > maxAnnotationLength {
|
||||
return value[:maxAnnotationLength]
|
||||
if len(value) > maxLength {
|
||||
return value[:maxLength]
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue