File: trace_interceptor.go

package info (click to toggle)
golang-github-ibm-sarama 1.45.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 2,964 kB
  • sloc: makefile: 35; sh: 19
file content (90 lines) | stat: -rw-r--r-- 2,727 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package main

import (
	"context"
	"strings"

	"go.opentelemetry.io/otel/attribute"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	"go.opentelemetry.io/otel/trace"

	"github.com/IBM/sarama"
)

type OTelInterceptor struct {
	tracer     trace.Tracer
	fixedAttrs []attribute.KeyValue
}

// NewOTelInterceptor processes span for intercepted messages and add some
// headers with the span data.
func NewOTelInterceptor(brokers []string) *OTelInterceptor {
	oi := OTelInterceptor{}
	oi.tracer = sdktrace.NewTracerProvider().Tracer("github.com/IBM/sarama/examples/interceptors")

	// These are based on the spec, which was reachable as of 2020-05-15
	// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
	oi.fixedAttrs = []attribute.KeyValue{
		attribute.String("messaging.destination_kind", "topic"),
		attribute.String("span.otel.kind", "PRODUCER"),
		attribute.String("messaging.system", "kafka"),
		attribute.String("net.transport", "IP.TCP"),
		attribute.String("messaging.url", strings.Join(brokers, ",")),
	}
	return &oi
}

const (
	MessageIDHeaderName = "message_id"
	SpanHeaderName      = "span_id"
	TraceHeaderName     = "trace_id"
)

func shouldIgnoreMsg(msg *sarama.ProducerMessage) bool {
	// check message hasn't been here before (retries)
	var traceFound, spanFound, msgIDFound bool
	for _, h := range msg.Headers {
		if string(h.Key) == TraceHeaderName {
			traceFound = true
			continue
		}
		if string(h.Key) == SpanHeaderName {
			spanFound = true
			continue
		}
		if string(h.Key) == MessageIDHeaderName {
			msgIDFound = true
		}
	}
	return traceFound && spanFound && msgIDFound
}

func (oi *OTelInterceptor) OnSend(msg *sarama.ProducerMessage) {
	if shouldIgnoreMsg(msg) {
		return
	}
	_, span := oi.tracer.Start(context.TODO(), msg.Topic)
	defer span.End()
	spanContext := span.SpanContext()
	attWithTopic := append(
		oi.fixedAttrs,
		attribute.String("messaging.destination", msg.Topic),
		attribute.String("messaging.message_id", spanContext.SpanID().String()),
	)
	span.SetAttributes(attWithTopic...)

	// remove existing partial tracing headers if exists
	noTraceHeaders := msg.Headers[:0]
	for _, h := range msg.Headers {
		key := string(h.Key)
		if key != TraceHeaderName && key != SpanHeaderName && key != MessageIDHeaderName {
			noTraceHeaders = append(noTraceHeaders, h)
		}
	}
	traceHeaders := []sarama.RecordHeader{
		{Key: []byte(TraceHeaderName), Value: []byte(spanContext.TraceID().String())},
		{Key: []byte(SpanHeaderName), Value: []byte(spanContext.SpanID().String())},
		{Key: []byte(MessageIDHeaderName), Value: []byte(spanContext.SpanID().String())},
	}
	msg.Headers = append(noTraceHeaders, traceHeaders...)
}