1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
package detect
import (
"context"
"sync"
"time"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
)
type TraceRecorder struct {
sdktrace.SpanExporter
mu sync.Mutex
m map[trace.TraceID]*stubs
listeners map[trace.TraceID]int
flush func(context.Context) error
}
type stubs struct {
spans []tracetest.SpanStub
last time.Time
}
func NewTraceRecorder() *TraceRecorder {
tr := &TraceRecorder{
m: map[trace.TraceID]*stubs{},
listeners: map[trace.TraceID]int{},
}
go func() {
t := time.NewTimer(60 * time.Second)
for {
<-t.C
tr.gc()
t.Reset(50 * time.Second)
}
}()
return tr
}
func (r *TraceRecorder) Record(traceID trace.TraceID) func() []tracetest.SpanStub {
r.mu.Lock()
defer r.mu.Unlock()
r.listeners[traceID]++
var once sync.Once
var spans []tracetest.SpanStub
return func() []tracetest.SpanStub {
once.Do(func() {
if r.flush != nil {
r.flush(context.TODO())
}
r.mu.Lock()
defer r.mu.Unlock()
if v, ok := r.m[traceID]; ok {
spans = v.spans
}
r.listeners[traceID]--
if r.listeners[traceID] == 0 {
delete(r.listeners, traceID)
}
})
return spans
}
}
func (r *TraceRecorder) gc() {
r.mu.Lock()
defer r.mu.Unlock()
now := time.Now()
for k, s := range r.m {
if _, ok := r.listeners[k]; ok {
continue
}
if now.Sub(s.last) > 60*time.Second {
delete(r.m, k)
}
}
}
func (r *TraceRecorder) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
r.mu.Lock()
now := time.Now()
for _, s := range spans {
ss := tracetest.SpanStubFromReadOnlySpan(s)
v, ok := r.m[ss.SpanContext.TraceID()]
if !ok {
v = &stubs{}
r.m[s.SpanContext().TraceID()] = v
}
v.last = now
v.spans = append(v.spans, ss)
}
r.mu.Unlock()
if r.SpanExporter == nil {
return nil
}
return r.SpanExporter.ExportSpans(ctx, spans)
}
func (r *TraceRecorder) Shutdown(ctx context.Context) error {
if r.SpanExporter == nil {
return nil
}
return r.SpanExporter.Shutdown(ctx)
}
|