1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
package kafka
import (
"fmt"
"github.com/optiopay/kafka/v2/proto"
)
func ExampleConsumer() {
// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := Dial(addresses, NewBrokerConf("test"))
if err != nil {
panic(err)
}
defer broker.Close()
// create new consumer
conf := NewConsumerConf("my-messages", 0)
conf.StartOffset = StartOffsetNewest
consumer, err := broker.Consumer(conf)
if err != nil {
panic(err)
}
// read all messages
for {
msg, err := consumer.Consume()
if err != nil {
if err == ErrNoData {
break
}
panic(err)
}
fmt.Printf("message: %#v", msg)
}
}
func ExampleOffsetCoordinator() {
// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := Dial(addresses, NewBrokerConf("test"))
if err != nil {
panic(err)
}
defer broker.Close()
// create offset coordinator and customize configuration
conf := NewOffsetCoordinatorConf("my-consumer-group")
conf.RetryErrLimit = 20
coordinator, err := broker.OffsetCoordinator(conf)
if err != nil {
panic(err)
}
// write consumed message offset for topic/partition
if err := coordinator.Commit("my-topic", 0, 12); err != nil {
panic(err)
}
// get latest consumed offset for given topic/partition
off, _, err := coordinator.Offset("my-topic", 0)
if err != nil {
panic(err)
}
if off != 12 {
panic(fmt.Sprintf("offset is %d, not 12", off))
}
}
func ExampleProducer() {
// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := Dial(addresses, NewBrokerConf("test"))
if err != nil {
panic(err)
}
defer broker.Close()
// create new producer
conf := NewProducerConf()
conf.RequiredAcks = proto.RequiredAcksLocal
// write two messages to kafka using single call to make it atomic
producer := broker.Producer(conf)
messages := []*proto.Message{
{Value: []byte("first")},
{Value: []byte("second")},
}
if _, err := producer.Produce("my-messages", 0, messages...); err != nil {
panic(err)
}
}
func ExampleMerge() {
// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := Dial(addresses, NewBrokerConf("test"))
if err != nil {
panic(err)
}
defer broker.Close()
topics := []string{"fruits", "vegetables"}
fetchers := make([]Consumer, len(topics))
// create consumers for different topics
for i, topic := range topics {
conf := NewConsumerConf(topic, 0)
conf.RetryLimit = 20
conf.StartOffset = StartOffsetNewest
consumer, err := broker.Consumer(conf)
if err != nil {
panic(err)
}
fetchers[i] = consumer
}
// merge all created consumers (they don't even have to belong to the same broker!)
mx := Merge(fetchers...)
defer mx.Close()
// consume messages from all sources
for {
msg, err := mx.Consume()
if err != nil {
panic(err)
}
fmt.Printf("message: %#v", msg)
}
}
|