traefik/vendor/github.com/instana/go-sensor/recorder.go
2019-02-18 16:52:03 +01:00

175 lines
3.9 KiB
Go

package instana
import (
"sync"
"time"
)
// A SpanRecorder handles all of the `RawSpan` data generated via an
// associated `Tracer` (see `NewStandardTracer`) instance. It also names
// the containing process and provides access to a straightforward tag map.
type SpanRecorder interface {
// Implementations must determine whether and where to store `span`.
RecordSpan(span *spanS)
}
// Recorder accepts spans, processes and queues them
// for delivery to the backend.
type Recorder struct {
sync.RWMutex
spans []jsonSpan
testMode bool
}
// NewRecorder Establish a Recorder span recorder
func NewRecorder() *Recorder {
r := new(Recorder)
r.init()
return r
}
// NewTestRecorder Establish a new span recorder used for testing
func NewTestRecorder() *Recorder {
r := new(Recorder)
r.testMode = true
r.init()
return r
}
func (r *Recorder) init() {
r.clearQueuedSpans()
if r.testMode {
return
}
ticker := time.NewTicker(1 * time.Second)
go func() {
for range ticker.C {
if sensor.agent.canSend() {
r.send()
}
}
}()
}
// RecordSpan accepts spans to be recorded and and added to the span queue
// for eventual reporting to the host agent.
func (r *Recorder) RecordSpan(span *spanS) {
// If we're not announced and not in test mode then just
// return
if !r.testMode && !sensor.agent.canSend() {
return
}
var data = &jsonData{}
kind := span.getSpanKind()
data.SDK = &jsonSDKData{
Name: span.Operation,
Type: kind,
Custom: &jsonCustomData{Tags: span.Tags, Logs: span.collectLogs()}}
baggage := make(map[string]string)
span.context.ForeachBaggageItem(func(k string, v string) bool {
baggage[k] = v
return true
})
if len(baggage) > 0 {
data.SDK.Custom.Baggage = baggage
}
data.Service = sensor.serviceName
var parentID *int64
if span.ParentSpanID == 0 {
parentID = nil
} else {
parentID = &span.ParentSpanID
}
r.Lock()
defer r.Unlock()
if len(r.spans) == sensor.options.MaxBufferedSpans {
r.spans = r.spans[1:]
}
r.spans = append(r.spans, jsonSpan{
TraceID: span.context.TraceID,
ParentID: parentID,
SpanID: span.context.SpanID,
Timestamp: uint64(span.Start.UnixNano()) / uint64(time.Millisecond),
Duration: uint64(span.Duration) / uint64(time.Millisecond),
Name: "sdk",
Error: span.Error,
Ec: span.Ec,
Lang: "go",
From: sensor.agent.from,
Data: data})
if r.testMode || !sensor.agent.canSend() {
return
}
if len(r.spans) >= sensor.options.ForceTransmissionStartingAt {
log.debug("Forcing spans to agent. Count:", len(r.spans))
go r.send()
}
}
// QueuedSpansCount returns the number of queued spans
// Used only in tests currently.
func (r *Recorder) QueuedSpansCount() int {
r.RLock()
defer r.RUnlock()
return len(r.spans)
}
// GetQueuedSpans returns a copy of the queued spans and clears the queue.
func (r *Recorder) GetQueuedSpans() []jsonSpan {
r.Lock()
defer r.Unlock()
// Copy queued spans
queuedSpans := make([]jsonSpan, len(r.spans))
copy(queuedSpans, r.spans)
// and clear out the source
r.clearQueuedSpans()
return queuedSpans
}
// clearQueuedSpans brings the span queue to empty/0/nada
// This function doesn't take the Lock so make sure to have
// the write lock before calling.
// This is meant to be called from GetQueuedSpans which handles
// locking.
func (r *Recorder) clearQueuedSpans() {
var mbs int
if len(r.spans) > 0 {
if sensor != nil {
mbs = sensor.options.MaxBufferedSpans
} else {
mbs = DefaultMaxBufferedSpans
}
r.spans = make([]jsonSpan, 0, mbs)
}
}
// Retrieve the queued spans and post them to the host agent asynchronously.
func (r *Recorder) send() {
spansToSend := r.GetQueuedSpans()
if len(spansToSend) > 0 {
go func() {
_, err := sensor.agent.request(sensor.agent.makeURL(agentTracesURL), "POST", spansToSend)
if err != nil {
log.debug("Posting traces failed in send(): ", err)
sensor.agent.reset()
}
}()
}
}