268 lines
7.7 KiB
Go
268 lines
7.7 KiB
Go
|
// Copyright (c) 2016 Uber Technologies, Inc.
|
||
|
//
|
||
|
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
||
|
// of this software and associated documentation files (the "Software"), to deal
|
||
|
// in the Software without restriction, including without limitation the rights
|
||
|
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||
|
// copies of the Software, and to permit persons to whom the Software is
|
||
|
// furnished to do so, subject to the following conditions:
|
||
|
//
|
||
|
// The above copyright notice and this permission notice shall be included in
|
||
|
// all copies or substantial portions of the Software.
|
||
|
//
|
||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||
|
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||
|
// THE SOFTWARE.
|
||
|
|
||
|
package jaeger
|
||
|
|
||
|
import (
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
|
||
|
"github.com/opentracing/opentracing-go"
|
||
|
|
||
|
"github.com/uber/jaeger-client-go/log"
|
||
|
)
|
||
|
|
||
|
// Reporter is called by the tracer when a span is completed to report the span to the tracing collector.
|
||
|
type Reporter interface {
|
||
|
// Report submits a new span to collectors, possibly asynchronously and/or with buffering.
|
||
|
Report(span *Span)
|
||
|
|
||
|
// Close does a clean shutdown of the reporter, flushing any traces that may be buffered in memory.
|
||
|
Close()
|
||
|
}
|
||
|
|
||
|
// ------------------------------
|
||
|
|
||
|
type nullReporter struct{}
|
||
|
|
||
|
// NewNullReporter creates a no-op reporter that ignores all reported spans.
|
||
|
func NewNullReporter() Reporter {
|
||
|
return &nullReporter{}
|
||
|
}
|
||
|
|
||
|
// Report implements Report() method of Reporter by doing nothing.
|
||
|
func (r *nullReporter) Report(span *Span) {
|
||
|
// no-op
|
||
|
}
|
||
|
|
||
|
// Close implements Close() method of Reporter by doing nothing.
|
||
|
func (r *nullReporter) Close() {
|
||
|
// no-op
|
||
|
}
|
||
|
|
||
|
// ------------------------------
|
||
|
|
||
|
type loggingReporter struct {
|
||
|
logger Logger
|
||
|
}
|
||
|
|
||
|
// NewLoggingReporter creates a reporter that logs all reported spans to provided logger.
|
||
|
func NewLoggingReporter(logger Logger) Reporter {
|
||
|
return &loggingReporter{logger}
|
||
|
}
|
||
|
|
||
|
// Report implements Report() method of Reporter by logging the span to the logger.
|
||
|
func (r *loggingReporter) Report(span *Span) {
|
||
|
r.logger.Infof("Reporting span %+v", span)
|
||
|
}
|
||
|
|
||
|
// Close implements Close() method of Reporter by doing nothing.
|
||
|
func (r *loggingReporter) Close() {
|
||
|
// no-op
|
||
|
}
|
||
|
|
||
|
// ------------------------------
|
||
|
|
||
|
// InMemoryReporter is used for testing, and simply collects spans in memory.
|
||
|
type InMemoryReporter struct {
|
||
|
spans []opentracing.Span
|
||
|
lock sync.Mutex
|
||
|
}
|
||
|
|
||
|
// NewInMemoryReporter creates a reporter that stores spans in memory.
|
||
|
// NOTE: the Tracer should be created with options.PoolSpans = false.
|
||
|
func NewInMemoryReporter() *InMemoryReporter {
|
||
|
return &InMemoryReporter{
|
||
|
spans: make([]opentracing.Span, 0, 10),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Report implements Report() method of Reporter by storing the span in the buffer.
|
||
|
func (r *InMemoryReporter) Report(span *Span) {
|
||
|
r.lock.Lock()
|
||
|
r.spans = append(r.spans, span)
|
||
|
r.lock.Unlock()
|
||
|
}
|
||
|
|
||
|
// Close implements Close() method of Reporter by doing nothing.
|
||
|
func (r *InMemoryReporter) Close() {
|
||
|
// no-op
|
||
|
}
|
||
|
|
||
|
// SpansSubmitted returns the number of spans accumulated in the buffer.
|
||
|
func (r *InMemoryReporter) SpansSubmitted() int {
|
||
|
r.lock.Lock()
|
||
|
defer r.lock.Unlock()
|
||
|
return len(r.spans)
|
||
|
}
|
||
|
|
||
|
// GetSpans returns accumulated spans as a copy of the buffer.
|
||
|
func (r *InMemoryReporter) GetSpans() []opentracing.Span {
|
||
|
r.lock.Lock()
|
||
|
defer r.lock.Unlock()
|
||
|
copied := make([]opentracing.Span, len(r.spans))
|
||
|
copy(copied, r.spans)
|
||
|
return copied
|
||
|
}
|
||
|
|
||
|
// Reset clears all accumulated spans.
|
||
|
func (r *InMemoryReporter) Reset() {
|
||
|
r.lock.Lock()
|
||
|
defer r.lock.Unlock()
|
||
|
r.spans = nil
|
||
|
}
|
||
|
|
||
|
// ------------------------------
|
||
|
|
||
|
type compositeReporter struct {
|
||
|
reporters []Reporter
|
||
|
}
|
||
|
|
||
|
// NewCompositeReporter creates a reporter that ignores all reported spans.
|
||
|
func NewCompositeReporter(reporters ...Reporter) Reporter {
|
||
|
return &compositeReporter{reporters: reporters}
|
||
|
}
|
||
|
|
||
|
// Report implements Report() method of Reporter by delegating to each underlying reporter.
|
||
|
func (r *compositeReporter) Report(span *Span) {
|
||
|
for _, reporter := range r.reporters {
|
||
|
reporter.Report(span)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Close implements Close() method of Reporter by closing each underlying reporter.
|
||
|
func (r *compositeReporter) Close() {
|
||
|
for _, reporter := range r.reporters {
|
||
|
reporter.Close()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// ------------------------------
|
||
|
|
||
|
const (
|
||
|
defaultQueueSize = 100
|
||
|
defaultBufferFlushInterval = 10 * time.Second
|
||
|
)
|
||
|
|
||
|
type remoteReporter struct {
|
||
|
// must be first in the struct because `sync/atomic` expects 64-bit alignment.
|
||
|
// Cf. https://github.com/uber/jaeger-client-go/issues/155, https://goo.gl/zW7dgq
|
||
|
queueLength int64
|
||
|
|
||
|
reporterOptions
|
||
|
sender Transport
|
||
|
queue chan *Span
|
||
|
queueDrained sync.WaitGroup
|
||
|
flushSignal chan *sync.WaitGroup
|
||
|
}
|
||
|
|
||
|
// NewRemoteReporter creates a new reporter that sends spans out of process by means of Sender
|
||
|
func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter {
|
||
|
options := reporterOptions{}
|
||
|
for _, option := range opts {
|
||
|
option(&options)
|
||
|
}
|
||
|
if options.bufferFlushInterval <= 0 {
|
||
|
options.bufferFlushInterval = defaultBufferFlushInterval
|
||
|
}
|
||
|
if options.logger == nil {
|
||
|
options.logger = log.NullLogger
|
||
|
}
|
||
|
if options.metrics == nil {
|
||
|
options.metrics = NewNullMetrics()
|
||
|
}
|
||
|
if options.queueSize <= 0 {
|
||
|
options.queueSize = defaultQueueSize
|
||
|
}
|
||
|
reporter := &remoteReporter{
|
||
|
reporterOptions: options,
|
||
|
sender: sender,
|
||
|
flushSignal: make(chan *sync.WaitGroup),
|
||
|
queue: make(chan *Span, options.queueSize),
|
||
|
}
|
||
|
go reporter.processQueue()
|
||
|
return reporter
|
||
|
}
|
||
|
|
||
|
// Report implements Report() method of Reporter.
|
||
|
// It passes the span to a background go-routine for submission to Jaeger.
|
||
|
func (r *remoteReporter) Report(span *Span) {
|
||
|
select {
|
||
|
case r.queue <- span:
|
||
|
atomic.AddInt64(&r.queueLength, 1)
|
||
|
default:
|
||
|
r.metrics.ReporterDropped.Inc(1)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Close implements Close() method of Reporter by waiting for the queue to be drained.
|
||
|
func (r *remoteReporter) Close() {
|
||
|
r.queueDrained.Add(1)
|
||
|
close(r.queue)
|
||
|
r.queueDrained.Wait()
|
||
|
r.sender.Close()
|
||
|
}
|
||
|
|
||
|
// processQueue reads spans from the queue, converts them to Thrift, and stores them in an internal buffer.
|
||
|
// When the buffer length reaches batchSize, it is flushed by submitting the accumulated spans to Jaeger.
|
||
|
// Buffer also gets flushed automatically every batchFlushInterval seconds, just in case the tracer stopped
|
||
|
// reporting new spans.
|
||
|
func (r *remoteReporter) processQueue() {
|
||
|
timer := time.NewTicker(r.bufferFlushInterval)
|
||
|
for {
|
||
|
select {
|
||
|
case span, ok := <-r.queue:
|
||
|
if ok {
|
||
|
atomic.AddInt64(&r.queueLength, -1)
|
||
|
if flushed, err := r.sender.Append(span); err != nil {
|
||
|
r.metrics.ReporterFailure.Inc(int64(flushed))
|
||
|
r.logger.Error(err.Error())
|
||
|
} else if flushed > 0 {
|
||
|
r.metrics.ReporterSuccess.Inc(int64(flushed))
|
||
|
// to reduce the number of gauge stats, we only emit queue length on flush
|
||
|
r.metrics.ReporterQueueLength.Update(atomic.LoadInt64(&r.queueLength))
|
||
|
}
|
||
|
} else {
|
||
|
// queue closed
|
||
|
timer.Stop()
|
||
|
r.flush()
|
||
|
r.queueDrained.Done()
|
||
|
return
|
||
|
}
|
||
|
case <-timer.C:
|
||
|
r.flush()
|
||
|
case wg := <-r.flushSignal: // for testing
|
||
|
r.flush()
|
||
|
wg.Done()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// flush causes the Sender to flush its accumulated spans and clear the buffer
|
||
|
func (r *remoteReporter) flush() {
|
||
|
if flushed, err := r.sender.Flush(); err != nil {
|
||
|
r.metrics.ReporterFailure.Inc(int64(flushed))
|
||
|
r.logger.Error(err.Error())
|
||
|
} else if flushed > 0 {
|
||
|
r.metrics.ReporterSuccess.Inc(int64(flushed))
|
||
|
}
|
||
|
}
|