1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
package kafka
import (
"context"
"fmt"
"os"
"reflect"
"sort"
"testing"
"time"
)
func TestClientDescribeGroups(t *testing.T) {
if os.Getenv("KAFKA_VERSION") == "2.3.1" {
// There's a bug in 2.3.1 that causes the MemberMetadata to be in the wrong format and thus
// leads to an error when decoding the DescribeGroupsResponse.
//
// See https://issues.apache.org/jira/browse/KAFKA-9150 for details.
t.Skip("Skipping because kafka version is 2.3.1")
}
client, shutdown := newLocalClient()
defer shutdown()
topic := makeTopic()
gid := fmt.Sprintf("%s-test-group", topic)
createTopic(t, topic, 2)
defer deleteTopic(t, topic)
w := newTestWriter(WriterConfig{
Topic: topic,
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := w.WriteMessages(
ctx,
Message{
Key: []byte("key"),
Value: []byte("value"),
},
)
if err != nil {
t.Fatal(err)
}
r := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
GroupID: gid,
MinBytes: 10,
MaxBytes: 1000,
})
_, err = r.ReadMessage(ctx)
if err != nil {
t.Fatal(err)
}
resp, err := client.DescribeGroups(
ctx,
&DescribeGroupsRequest{
GroupIDs: []string{gid},
},
)
if err != nil {
t.Fatal(err)
}
if len(resp.Groups) != 1 {
t.Fatal(
"Unexpected number of groups returned",
"expected", 1,
"got", len(resp.Groups),
)
}
g := resp.Groups[0]
if g.Error != nil {
t.Error(
"Wrong error in group response",
"expected", nil,
"got", g.Error,
)
}
if g.GroupID != gid {
t.Error(
"Wrong groupID",
"expected", gid,
"got", g.GroupID,
)
}
if len(g.Members) != 1 {
t.Fatal(
"Wrong group members length",
"expected", 1,
"got", len(g.Members),
)
}
if len(g.Members[0].MemberAssignments.Topics) != 1 {
t.Fatal(
"Wrong topics length",
"expected", 1,
"got", len(g.Members[0].MemberAssignments.Topics),
)
}
mt := g.Members[0].MemberAssignments.Topics[0]
if mt.Topic != topic {
t.Error(
"Wrong member assignment topic",
"expected", topic,
"got", mt.Topic,
)
}
// Partitions can be in any order, sort them
sort.Slice(mt.Partitions, func(a, b int) bool {
return mt.Partitions[a] < mt.Partitions[b]
})
if !reflect.DeepEqual([]int{0, 1}, mt.Partitions) {
t.Error(
"Wrong member assignment partitions",
"expected", []int{0, 1},
"got", mt.Partitions,
)
}
}
|