File: main.go

package info (click to toggle)
golang-github-ibm-sarama 1.45.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 2,964 kB
  • sloc: makefile: 35; sh: 19
file content (83 lines) | stat: -rw-r--r-- 2,252 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"os"
	"os/signal"
	"strings"
	"time"

	stdout "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"

	"github.com/IBM/sarama"
)

var (
	brokers = flag.String("brokers", "localhost:9092", "The Kafka brokers to connect to, as a comma separated list")
	version = flag.String("version", sarama.DefaultVersion.String(), "Kafka cluster version")
	topic   = flag.String("topic", "default_topic", "The Kafka topic to use")
	logger  = log.New(os.Stdout, "[OTelInterceptor] ", log.LstdFlags)
)

func main() {
	flag.Parse()

	if *brokers == "" {
		logger.Fatalln("at least one broker is required")
	}
	splitBrokers := strings.Split(*brokers, ",")
	sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)

	version, err := sarama.ParseKafkaVersion(*version)
	if err != nil {
		log.Panicf("Error parsing Kafka version: %v", err)
	}

	// oTel stdout example
	pusher, err := stdout.New()
	if err != nil {
		logger.Fatalf("failed to initialize stdout export pipeline: %v", err)
	}
	defer pusher.Shutdown(context.Background())

	// simple sarama producer that adds a new producer interceptor
	conf := sarama.NewConfig()
	conf.Version = version
	conf.Producer.Interceptors = []sarama.ProducerInterceptor{NewOTelInterceptor(splitBrokers)}

	producer, err := sarama.NewAsyncProducer(splitBrokers, conf)
	if err != nil {
		panic("Couldn't create a Kafka producer")
	}
	defer producer.AsyncClose()

	// kill -2, trap SIGINT to trigger a shutdown
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// ticker
	bulkSize := 2
	duration := 5 * time.Second
	ticker := time.NewTicker(duration)
	logger.Printf("Starting to produce %v messages every %v", bulkSize, duration)
	for {
		select {
		case t := <-ticker.C:
			now := t.Format(time.RFC3339)
			logger.Printf("\nproducing %v messages to topic %s at %s", bulkSize, *topic, now)
			for i := 0; i < bulkSize; i++ {
				producer.Input() <- &sarama.ProducerMessage{
					Topic: *topic, Key: nil,
					Value: sarama.StringEncoder(fmt.Sprintf("test message %v/%v from kafka-client-go-test at %s", i+1, bulkSize, now)),
				}
			}
		case <-signals:
			logger.Println("terminating the program")
			logger.Println("Bye :)")
			return
		}
	}
}