File: example_test.go

package info (click to toggle)
golang-github-optiopay-kafka 0.0~git20150921.0.bc8e095-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 400 kB
  • ctags: 461
  • sloc: sh: 45; makefile: 2
file content (133 lines) | stat: -rw-r--r-- 2,930 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package kafka

import (
	"fmt"

	"github.com/optiopay/kafka/proto"
)

func ExampleConsumer() {
	// connect to kafka cluster
	addresses := []string{"localhost:9092", "localhost:9093"}
	broker, err := Dial(addresses, NewBrokerConf("test"))
	if err != nil {
		panic(err)
	}
	defer broker.Close()

	// create new consumer
	conf := NewConsumerConf("my-messages", 0)
	conf.StartOffset = StartOffsetNewest
	consumer, err := broker.Consumer(conf)
	if err != nil {
		panic(err)
	}

	// read all messages
	for {
		msg, err := consumer.Consume()
		if err != nil {
			if err == ErrNoData {
				break
			}
			panic(err)
		}

		fmt.Printf("message: %#v", msg)
	}
}

func ExampleOffsetCoordinator() {
	// connect to kafka cluster
	addresses := []string{"localhost:9092", "localhost:9093"}
	broker, err := Dial(addresses, NewBrokerConf("test"))
	if err != nil {
		panic(err)
	}
	defer broker.Close()

	// create offset coordinator and customize configuration
	conf := NewOffsetCoordinatorConf("my-consumer-group")
	conf.RetryErrLimit = 20
	coordinator, err := broker.OffsetCoordinator(conf)
	if err != nil {
		panic(err)
	}

	// write consumed message offset for topic/partition
	if err := coordinator.Commit("my-topic", 0, 12); err != nil {
		panic(err)
	}

	// get latest consumed offset for given topic/partition
	off, _, err := coordinator.Offset("my-topic", 0)
	if err != nil {
		panic(err)
	}

	if off != 12 {
		panic(fmt.Sprintf("offset is %d, not 12", off))
	}
}

func ExampleProducer() {
	// connect to kafka cluster
	addresses := []string{"localhost:9092", "localhost:9093"}
	broker, err := Dial(addresses, NewBrokerConf("test"))
	if err != nil {
		panic(err)
	}
	defer broker.Close()

	// create new producer
	conf := NewProducerConf()
	conf.RequiredAcks = proto.RequiredAcksLocal

	// write two messages to kafka using single call to make it atomic
	producer := broker.Producer(conf)
	messages := []*proto.Message{
		{Value: []byte("first")},
		{Value: []byte("second")},
	}
	if _, err := producer.Produce("my-messages", 0, messages...); err != nil {
		panic(err)
	}
}

func ExampleMerge() {
	// connect to kafka cluster
	addresses := []string{"localhost:9092", "localhost:9093"}
	broker, err := Dial(addresses, NewBrokerConf("test"))
	if err != nil {
		panic(err)
	}
	defer broker.Close()

	topics := []string{"fruits", "vegetables"}
	fetchers := make([]Consumer, len(topics))

	// create consumers for different topics
	for i, topic := range topics {
		conf := NewConsumerConf(topic, 0)
		conf.RetryLimit = 20
		conf.StartOffset = StartOffsetNewest
		consumer, err := broker.Consumer(conf)
		if err != nil {
			panic(err)
		}
		fetchers[i] = consumer
	}

	// merge all created consumers (they don't even have to belong to the same broker!)
	mx := Merge(fetchers...)
	defer mx.Close()

	// consume messages from all sources
	for {
		msg, err := mx.Consume()
		if err != nil {
			panic(err)
		}
		fmt.Printf("message: %#v", msg)
	}
}