175 lines
3.9 KiB
Go
175 lines
3.9 KiB
Go
package instana
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// A SpanRecorder handles all of the `RawSpan` data generated via an
|
|
// associated `Tracer` (see `NewStandardTracer`) instance. It also names
|
|
// the containing process and provides access to a straightforward tag map.
|
|
type SpanRecorder interface {
|
|
// Implementations must determine whether and where to store `span`.
|
|
RecordSpan(span *spanS)
|
|
}
|
|
|
|
// Recorder accepts spans, processes and queues them
|
|
// for delivery to the backend.
|
|
type Recorder struct {
|
|
sync.RWMutex
|
|
spans []jsonSpan
|
|
testMode bool
|
|
}
|
|
|
|
// NewRecorder Establish a Recorder span recorder
|
|
func NewRecorder() *Recorder {
|
|
r := new(Recorder)
|
|
r.init()
|
|
return r
|
|
}
|
|
|
|
// NewTestRecorder Establish a new span recorder used for testing
|
|
func NewTestRecorder() *Recorder {
|
|
r := new(Recorder)
|
|
r.testMode = true
|
|
r.init()
|
|
return r
|
|
}
|
|
|
|
func (r *Recorder) init() {
|
|
r.clearQueuedSpans()
|
|
|
|
if r.testMode {
|
|
return
|
|
}
|
|
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
go func() {
|
|
for range ticker.C {
|
|
if sensor.agent.canSend() {
|
|
r.send()
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// RecordSpan accepts spans to be recorded and and added to the span queue
|
|
// for eventual reporting to the host agent.
|
|
func (r *Recorder) RecordSpan(span *spanS) {
|
|
// If we're not announced and not in test mode then just
|
|
// return
|
|
if !r.testMode && !sensor.agent.canSend() {
|
|
return
|
|
}
|
|
|
|
var data = &jsonData{}
|
|
kind := span.getSpanKind()
|
|
|
|
data.SDK = &jsonSDKData{
|
|
Name: span.Operation,
|
|
Type: kind,
|
|
Custom: &jsonCustomData{Tags: span.Tags, Logs: span.collectLogs()}}
|
|
|
|
baggage := make(map[string]string)
|
|
span.context.ForeachBaggageItem(func(k string, v string) bool {
|
|
baggage[k] = v
|
|
|
|
return true
|
|
})
|
|
|
|
if len(baggage) > 0 {
|
|
data.SDK.Custom.Baggage = baggage
|
|
}
|
|
|
|
data.Service = sensor.serviceName
|
|
|
|
var parentID *int64
|
|
if span.ParentSpanID == 0 {
|
|
parentID = nil
|
|
} else {
|
|
parentID = &span.ParentSpanID
|
|
}
|
|
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
|
|
if len(r.spans) == sensor.options.MaxBufferedSpans {
|
|
r.spans = r.spans[1:]
|
|
}
|
|
|
|
r.spans = append(r.spans, jsonSpan{
|
|
TraceID: span.context.TraceID,
|
|
ParentID: parentID,
|
|
SpanID: span.context.SpanID,
|
|
Timestamp: uint64(span.Start.UnixNano()) / uint64(time.Millisecond),
|
|
Duration: uint64(span.Duration) / uint64(time.Millisecond),
|
|
Name: "sdk",
|
|
Error: span.Error,
|
|
Ec: span.Ec,
|
|
Lang: "go",
|
|
From: sensor.agent.from,
|
|
Data: data})
|
|
|
|
if r.testMode || !sensor.agent.canSend() {
|
|
return
|
|
}
|
|
|
|
if len(r.spans) >= sensor.options.ForceTransmissionStartingAt {
|
|
log.debug("Forcing spans to agent. Count:", len(r.spans))
|
|
go r.send()
|
|
}
|
|
}
|
|
|
|
// QueuedSpansCount returns the number of queued spans
|
|
// Used only in tests currently.
|
|
func (r *Recorder) QueuedSpansCount() int {
|
|
r.RLock()
|
|
defer r.RUnlock()
|
|
return len(r.spans)
|
|
}
|
|
|
|
// GetQueuedSpans returns a copy of the queued spans and clears the queue.
|
|
func (r *Recorder) GetQueuedSpans() []jsonSpan {
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
|
|
// Copy queued spans
|
|
queuedSpans := make([]jsonSpan, len(r.spans))
|
|
copy(queuedSpans, r.spans)
|
|
|
|
// and clear out the source
|
|
r.clearQueuedSpans()
|
|
return queuedSpans
|
|
}
|
|
|
|
// clearQueuedSpans brings the span queue to empty/0/nada
|
|
// This function doesn't take the Lock so make sure to have
|
|
// the write lock before calling.
|
|
// This is meant to be called from GetQueuedSpans which handles
|
|
// locking.
|
|
func (r *Recorder) clearQueuedSpans() {
|
|
var mbs int
|
|
|
|
if len(r.spans) > 0 {
|
|
if sensor != nil {
|
|
mbs = sensor.options.MaxBufferedSpans
|
|
} else {
|
|
mbs = DefaultMaxBufferedSpans
|
|
}
|
|
r.spans = make([]jsonSpan, 0, mbs)
|
|
}
|
|
}
|
|
|
|
// Retrieve the queued spans and post them to the host agent asynchronously.
|
|
func (r *Recorder) send() {
|
|
spansToSend := r.GetQueuedSpans()
|
|
if len(spansToSend) > 0 {
|
|
go func() {
|
|
_, err := sensor.agent.request(sensor.agent.makeURL(agentTracesURL), "POST", spansToSend)
|
|
if err != nil {
|
|
log.debug("Posting traces failed in send(): ", err)
|
|
sensor.agent.reset()
|
|
}
|
|
}()
|
|
}
|
|
}
|