1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
|
/*
Package kafka implements a Kafka reporter to send spans to a Kafka server/cluster.
*/
package kafka
import (
"encoding/json"
"log"
"os"
"github.com/Shopify/sarama"
"github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/reporter"
)
// defaultKafkaTopic sets the standard Kafka topic our Reporter will publish
// on. The default topic for zipkin-receiver-kafka is "zipkin", see:
// https://github.com/openzipkin/zipkin/tree/master/zipkin-receiver-kafka
const defaultKafkaTopic = "zipkin"
// kafkaReporter implements Reporter by publishing spans to a Kafka
// broker.
type kafkaReporter struct {
producer sarama.AsyncProducer
logger *log.Logger
topic string
}
// ReporterOption sets a parameter for the kafkaReporter
type ReporterOption func(c *kafkaReporter)
// Logger sets the logger used to report errors in the collection
// process.
func Logger(logger *log.Logger) ReporterOption {
return func(c *kafkaReporter) {
c.logger = logger
}
}
// Producer sets the producer used to produce to Kafka.
func Producer(p sarama.AsyncProducer) ReporterOption {
return func(c *kafkaReporter) {
c.producer = p
}
}
// Topic sets the kafka topic to attach the reporter producer on.
func Topic(t string) ReporterOption {
return func(c *kafkaReporter) {
c.topic = t
}
}
// NewReporter returns a new Kafka-backed Reporter. address should be a slice of
// TCP endpoints of the form "host:port".
func NewReporter(address []string, options ...ReporterOption) (reporter.Reporter, error) {
r := &kafkaReporter{
logger: log.New(os.Stderr, "", log.LstdFlags),
topic: defaultKafkaTopic,
}
for _, option := range options {
option(r)
}
if r.producer == nil {
p, err := sarama.NewAsyncProducer(address, nil)
if err != nil {
return nil, err
}
r.producer = p
}
go r.logErrors()
return r, nil
}
func (r *kafkaReporter) logErrors() {
for pe := range r.producer.Errors() {
r.logger.Print("msg", pe.Msg, "err", pe.Err, "result", "failed to produce msg")
}
}
func (r *kafkaReporter) Send(s model.SpanModel) {
// Zipkin expects the message to be wrapped in an array
ss := []model.SpanModel{s}
m, err := json.Marshal(ss)
if err != nil {
r.logger.Printf("failed when marshalling the span: %s\n", err.Error())
return
}
r.producer.Input() <- &sarama.ProducerMessage{
Topic: r.topic,
Key: nil,
Value: sarama.ByteEncoder(m),
}
}
func (r *kafkaReporter) Close() error {
return r.producer.Close()
}
|