package tracer import ( "errors" "log" "os" "strconv" "time" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal" ) var _ ddtrace.Tracer = (*tracer)(nil) // tracer creates, buffers and submits Spans which are used to time blocks of // computation. They are accumulated and streamed into an internal payload, // which is flushed to the agent whenever its size exceeds a specific threshold // or when a certain interval of time has passed, whichever happens first. // // tracer operates based on a worker loop which responds to various request // channels. It additionally holds two buffers which accumulates error and trace // queues to be processed by the payload encoder. type tracer struct { *config *payload flushAllReq chan chan<- struct{} flushTracesReq chan struct{} flushErrorsReq chan struct{} exitReq chan struct{} payloadQueue chan []*span errorBuffer chan error // stopped is a channel that will be closed when the worker has exited. stopped chan struct{} // syncPush is used for testing. When non-nil, it causes pushTrace to become // a synchronous (blocking) operation, meaning that it will only return after // the trace has been fully processed and added onto the payload. syncPush chan struct{} } const ( // flushInterval is the interval at which the payload contents will be flushed // to the transport. flushInterval = 2 * time.Second // payloadMaxLimit is the maximum payload size allowed and should indicate the // maximum size of the package that the agent can receive. payloadMaxLimit = 9.5 * 1024 * 1024 // 9.5 MB // payloadSizeLimit specifies the maximum allowed size of the payload before // it will trigger a flush to the transport. payloadSizeLimit = payloadMaxLimit / 2 ) // Start starts the tracer with the given set of options. It will stop and replace // any running tracer, meaning that calling it several times will result in a restart // of the tracer by replacing the current instance with a new one. func Start(opts ...StartOption) { if internal.Testing { return // mock tracer active } t := internal.GetGlobalTracer() internal.SetGlobalTracer(newTracer(opts...)) t.Stop() } // Stop stops the started tracer. Subsequent calls are valid but become no-op. func Stop() { internal.SetGlobalTracer(&internal.NoopTracer{}) } // Span is an alias for ddtrace.Span. It is here to allow godoc to group methods returning // ddtrace.Span. It is recommended and is considered more correct to refer to this type as // ddtrace.Span instead. type Span = ddtrace.Span // StartSpan starts a new span with the given operation name and set of options. // If the tracer is not started, calling this function is a no-op. func StartSpan(operationName string, opts ...StartSpanOption) Span { return internal.GetGlobalTracer().StartSpan(operationName, opts...) } // Extract extracts a SpanContext from the carrier. The carrier is expected // to implement TextMapReader, otherwise an error is returned. // If the tracer is not started, calling this function is a no-op. func Extract(carrier interface{}) (ddtrace.SpanContext, error) { return internal.GetGlobalTracer().Extract(carrier) } // Inject injects the given SpanContext into the carrier. The carrier is // expected to implement TextMapWriter, otherwise an error is returned. // If the tracer is not started, calling this function is a no-op. func Inject(ctx ddtrace.SpanContext, carrier interface{}) error { return internal.GetGlobalTracer().Inject(ctx, carrier) } const ( // payloadQueueSize is the buffer size of the trace channel. payloadQueueSize = 1000 // errorBufferSize is the buffer size of the error channel. errorBufferSize = 200 ) func newTracer(opts ...StartOption) *tracer { c := new(config) defaults(c) for _, fn := range opts { fn(c) } if c.transport == nil { c.transport = newTransport(c.agentAddr) } if c.propagator == nil { c.propagator = NewPropagator(nil) } t := &tracer{ config: c, payload: newPayload(), flushAllReq: make(chan chan<- struct{}), flushTracesReq: make(chan struct{}, 1), flushErrorsReq: make(chan struct{}, 1), exitReq: make(chan struct{}), payloadQueue: make(chan []*span, payloadQueueSize), errorBuffer: make(chan error, errorBufferSize), stopped: make(chan struct{}), } go t.worker() return t } // worker receives finished traces to be added into the payload, as well // as periodically flushes traces to the transport. func (t *tracer) worker() { defer close(t.stopped) ticker := time.NewTicker(flushInterval) defer ticker.Stop() for { select { case trace := <-t.payloadQueue: t.pushPayload(trace) case <-ticker.C: t.flush() case done := <-t.flushAllReq: t.flush() done <- struct{}{} case <-t.flushTracesReq: t.flushTraces() case <-t.flushErrorsReq: t.flushErrors() case <-t.exitReq: t.flush() return } } } func (t *tracer) pushTrace(trace []*span) { select { case <-t.stopped: return default: } select { case t.payloadQueue <- trace: default: t.pushError(&dataLossError{ context: errors.New("payload queue full, dropping trace"), count: len(trace), }) } if t.syncPush != nil { // only in tests <-t.syncPush } } func (t *tracer) pushError(err error) { select { case <-t.stopped: return default: } if len(t.errorBuffer) >= cap(t.errorBuffer)/2 { // starts being full, anticipate, try and flush soon select { case t.flushErrorsReq <- struct{}{}: default: // a flush was already requested, skip } } select { case t.errorBuffer <- err: default: // OK, if we get this, our error error buffer is full, // we can assume it is filled with meaningful messages which // are going to be logged and hopefully read, nothing better // we can do, blocking would make things worse. } } // StartSpan creates, starts, and returns a new Span with the given `operationName`. func (t *tracer) StartSpan(operationName string, options ...ddtrace.StartSpanOption) ddtrace.Span { var opts ddtrace.StartSpanConfig for _, fn := range options { fn(&opts) } var startTime int64 if opts.StartTime.IsZero() { startTime = now() } else { startTime = opts.StartTime.UnixNano() } var context *spanContext if opts.Parent != nil { if ctx, ok := opts.Parent.(*spanContext); ok { context = ctx } } id := random.Uint64() // span defaults span := &span{ Name: operationName, Service: t.config.serviceName, Resource: operationName, Meta: map[string]string{}, Metrics: map[string]float64{}, SpanID: id, TraceID: id, ParentID: 0, Start: startTime, } if context != nil { // this is a child span span.TraceID = context.traceID span.ParentID = context.spanID if context.hasSamplingPriority() { span.Metrics[samplingPriorityKey] = float64(context.samplingPriority()) } if context.span != nil { context.span.RLock() span.Service = context.span.Service context.span.RUnlock() } } span.context = newSpanContext(span, context) if context == nil || context.span == nil { // this is either a global root span or a process-level root span span.SetTag(ext.Pid, strconv.Itoa(os.Getpid())) t.sample(span) } // add tags from options for k, v := range opts.Tags { span.SetTag(k, v) } // add global tags for k, v := range t.config.globalTags { span.SetTag(k, v) } return span } // Stop stops the tracer. func (t *tracer) Stop() { select { case <-t.stopped: return default: t.exitReq <- struct{}{} <-t.stopped } } // Inject uses the configured or default TextMap Propagator. func (t *tracer) Inject(ctx ddtrace.SpanContext, carrier interface{}) error { return t.config.propagator.Inject(ctx, carrier) } // Extract uses the configured or default TextMap Propagator. func (t *tracer) Extract(carrier interface{}) (ddtrace.SpanContext, error) { return t.config.propagator.Extract(carrier) } // flushTraces will push any currently buffered traces to the server. func (t *tracer) flushTraces() { if t.payload.itemCount() == 0 { return } size, count := t.payload.size(), t.payload.itemCount() if t.config.debug { log.Printf("Sending payload: size: %d traces: %d\n", size, count) } err := t.config.transport.send(t.payload) if err != nil && size > payloadMaxLimit { // we couldn't send the payload and it is getting too big to be // accepted by the agent, we have to drop it. t.payload.reset() t.pushError(&dataLossError{context: err, count: count}) } if err == nil { // send succeeded t.payload.reset() } } // flushErrors will process log messages that were queued func (t *tracer) flushErrors() { logErrors(t.errorBuffer) } func (t *tracer) flush() { t.flushTraces() t.flushErrors() } // forceFlush forces a flush of data (traces and services) to the agent. // Flushes are done by a background task on a regular basis, so you never // need to call this manually, mostly useful for testing and debugging. func (t *tracer) forceFlush() { done := make(chan struct{}) t.flushAllReq <- done <-done } // pushPayload pushes the trace onto the payload. If the payload becomes // larger than the threshold as a result, it sends a flush request. func (t *tracer) pushPayload(trace []*span) { if err := t.payload.push(trace); err != nil { t.pushError(&traceEncodingError{context: err}) } if t.payload.size() > payloadSizeLimit { // getting large select { case t.flushTracesReq <- struct{}{}: default: // flush already queued } } if t.syncPush != nil { // only in tests t.syncPush <- struct{}{} } } // sampleRateMetricKey is the metric key holding the applied sample rate. Has to be the same as the Agent. const sampleRateMetricKey = "_sample_rate" // Sample samples a span with the internal sampler. func (t *tracer) sample(span *span) { sampler := t.config.sampler sampled := sampler.Sample(span) span.context.sampled = sampled if !sampled { return } if rs, ok := sampler.(RateSampler); ok && rs.Rate() < 1 { // the span was sampled using a rate sampler which wasn't all permissive, // so we make note of the sampling rate. span.Lock() defer span.Unlock() if span.finished { // we don't touch finished span as they might be flushing return } span.Metrics[sampleRateMetricKey] = rs.Rate() } }