1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
|
package topics
import (
"context"
"net"
"regexp"
"testing"
"time"
"github.com/segmentio/kafka-go"
ktesting "github.com/segmentio/kafka-go/testing"
)
func TestListReNil(t *testing.T) {
_, err := ListRe(context.Background(), nil, nil)
if err == nil {
t.Fatal(err)
}
}
func TestListRe(t *testing.T) {
client, shutdown := newLocalClientWithTopic("TestTopics-A", 1)
defer shutdown()
clientCreateTopic(client, "TestTopics-B", 1)
allRegex := regexp.MustCompile("TestTopics-.*")
fooRegex := regexp.MustCompile("TestTopics-B")
// Get all the topics
topics, err := ListRe(context.Background(), client, allRegex)
if err != nil {
t.Fatal(err)
}
if len(topics) != 2 {
t.Error("the wrong number of topics were returned. ", len(topics))
}
// Get one topic
topics, err = ListRe(context.Background(), client, fooRegex)
if err != nil {
t.Fatal(err)
}
if len(topics) != 1 {
t.Error("the wrong number of topics were returned. ", len(topics))
}
}
func newLocalClientWithTopic(topic string, partitions int) (*kafka.Client, func()) {
client, shutdown := newLocalClient()
if err := clientCreateTopic(client, topic, partitions); err != nil {
shutdown()
panic(err)
}
return client, func() {
client.DeleteTopics(context.Background(), &kafka.DeleteTopicsRequest{
Topics: []string{topic},
})
shutdown()
}
}
func clientCreateTopic(client *kafka.Client, topic string, partitions int) error {
_, err := client.CreateTopics(context.Background(), &kafka.CreateTopicsRequest{
Topics: []kafka.TopicConfig{{
Topic: topic,
NumPartitions: partitions,
ReplicationFactor: 1,
}},
})
if err != nil {
return err
}
// Topic creation seems to be asynchronous. Metadata for the topic partition
// layout in the cluster is available in the controller before being synced
// with the other brokers, which causes "Error:[3] Unknown Topic Or Partition"
// when sending requests to the partition leaders.
//
// This loop will wait up to 2 seconds polling the cluster until no errors
// are returned.
for i := 0; i < 20; i++ {
r, err := client.Fetch(context.Background(), &kafka.FetchRequest{
Topic: topic,
Partition: 0,
Offset: 0,
})
if err == nil && r.Error == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
return nil
}
func newLocalClient() (*kafka.Client, func()) {
return newClient(kafka.TCP("localhost"))
}
func newClient(addr net.Addr) (*kafka.Client, func()) {
conns := &ktesting.ConnWaitGroup{
DialFunc: (&net.Dialer{}).DialContext,
}
transport := &kafka.Transport{
Dial: conns.Dial,
Resolver: kafka.NewBrokerResolver(nil),
}
client := &kafka.Client{
Addr: addr,
Timeout: 5 * time.Second,
Transport: transport,
}
return client, func() { transport.CloseIdleConnections(); conns.Wait() }
}
|