1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
package main
import (
"crypto/tls"
"crypto/x509"
"flag"
"log"
"os"
"os/signal"
"strings"
"github.com/IBM/sarama"
)
func init() {
sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
}
var (
brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
version = flag.String("version", sarama.DefaultVersion.String(), "Kafka cluster version")
userName = flag.String("username", "", "The SASL username")
passwd = flag.String("passwd", "", "The SASL password")
algorithm = flag.String("algorithm", "", "The SASL SCRAM SHA algorithm sha256 or sha512 as mechanism")
topic = flag.String("topic", "default_topic", "The Kafka topic to use")
certFile = flag.String("certificate", "", "The optional certificate file for client authentication")
keyFile = flag.String("key", "", "The optional key file for client authentication")
caFile = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
tlsSkipVerify = flag.Bool("tls-skip-verify", false, "Whether to skip TLS server cert verification")
useTLS = flag.Bool("tls", false, "Use TLS to communicate with the cluster")
mode = flag.String("mode", "produce", "Mode to run in: \"produce\" to produce, \"consume\" to consume")
logMsg = flag.Bool("logmsg", false, "True to log consumed messages to console")
logger = log.New(os.Stdout, "[Producer] ", log.LstdFlags)
)
func createTLSConfiguration() (t *tls.Config) {
t = &tls.Config{
InsecureSkipVerify: *tlsSkipVerify,
}
if *certFile != "" && *keyFile != "" && *caFile != "" {
cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
if err != nil {
log.Fatal(err)
}
caCert, err := os.ReadFile(*caFile)
if err != nil {
log.Fatal(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
t = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: *tlsSkipVerify,
}
}
return t
}
func main() {
flag.Parse()
if *brokers == "" {
log.Fatalln("at least one broker is required")
}
splitBrokers := strings.Split(*brokers, ",")
version, err := sarama.ParseKafkaVersion(*version)
if err != nil {
log.Panicf("Error parsing Kafka version: %v", err)
}
if *userName == "" {
log.Fatalln("SASL username is required")
}
if *passwd == "" {
log.Fatalln("SASL password is required")
}
conf := sarama.NewConfig()
conf.Producer.Retry.Max = 1
conf.Producer.RequiredAcks = sarama.WaitForAll
conf.Producer.Return.Successes = true
conf.Version = version
conf.ClientID = "sasl_scram_client"
conf.Metadata.Full = true
conf.Net.SASL.Enable = true
conf.Net.SASL.User = *userName
conf.Net.SASL.Password = *passwd
conf.Net.SASL.Handshake = true
if *algorithm == "sha512" {
conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
conf.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
} else if *algorithm == "sha256" {
conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
conf.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
} else {
log.Fatalf("invalid SHA algorithm \"%s\": can be either \"sha256\" or \"sha512\"", *algorithm)
}
if *useTLS {
conf.Net.TLS.Enable = true
conf.Net.TLS.Config = createTLSConfiguration()
}
if *mode == "consume" {
consumer, err := sarama.NewConsumer(splitBrokers, conf)
if err != nil {
panic(err)
}
log.Println("consumer created")
defer func() {
if err := consumer.Close(); err != nil {
log.Fatalln(err)
}
}()
log.Println("commence consuming")
partitionConsumer, err := consumer.ConsumePartition(*topic, 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}
defer func() {
if err := partitionConsumer.Close(); err != nil {
log.Fatalln(err)
}
}()
// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
consumed := 0
ConsumerLoop:
for {
log.Println("in the for")
select {
case msg := <-partitionConsumer.Messages():
log.Printf("Consumed message offset %d\n", msg.Offset)
if *logMsg {
log.Printf("KEY: %s VALUE: %s", msg.Key, msg.Value)
}
consumed++
case <-signals:
break ConsumerLoop
}
}
log.Printf("Consumed: %d\n", consumed)
} else {
syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
if err != nil {
logger.Fatalln("failed to create producer: ", err)
}
partition, offset, err := syncProducer.SendMessage(&sarama.ProducerMessage{
Topic: *topic,
Value: sarama.StringEncoder("test_message"),
})
if err != nil {
logger.Fatalln("failed to send message to ", *topic, err)
}
logger.Printf("wrote message at partition: %d, offset: %d", partition, offset)
_ = syncProducer.Close()
}
logger.Println("Bye now !")
}
|