1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
package kafka
import (
"context"
"log"
"os"
"testing"
"time"
ktesting "github.com/segmentio/kafka-go/testing"
)
func TestClientAddOffsetsToTxn(t *testing.T) {
if !ktesting.KafkaIsAtLeast("0.11.0") {
t.Skip("Skipping test because kafka version is not high enough.")
}
// TODO: look into why this test fails on Kafka 3.0.0 and higher when transactional support
// work is revisited.
if ktesting.KafkaIsAtLeast("3.0.0") {
t.Skip("Skipping test because it fails on Kafka version 3.0.0 or higher.")
}
topic := makeTopic()
transactionalID := makeTransactionalID()
client, shutdown := newLocalClient()
defer shutdown()
err := clientCreateTopic(client, topic, 3)
defer deleteTopic(t, topic)
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
waitForTopic(ctx, t, topic)
defer cancel()
respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{
Addr: client.Addr,
Key: transactionalID,
KeyType: CoordinatorKeyTypeConsumer,
})
if err != nil {
t.Fatal(err)
}
if respc.Error != nil {
t.Fatal(err)
}
groupID := makeGroupID()
group, err := NewConsumerGroup(ConsumerGroupConfig{
ID: groupID,
Topics: []string{topic},
Brokers: []string{"localhost:9092"},
HeartbeatInterval: 2 * time.Second,
RebalanceTimeout: 2 * time.Second,
RetentionTime: time.Hour,
Logger: log.New(os.Stdout, "cg-test: ", 0),
})
if err != nil {
t.Fatal(err)
}
defer group.Close()
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
_, err = group.Next(ctx)
if err != nil {
t.Fatal(err)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
respc, err = waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{
Addr: client.Addr,
Key: transactionalID,
KeyType: CoordinatorKeyTypeTransaction,
})
if err != nil {
t.Fatal(err)
}
if respc.Error != nil {
t.Fatal(err)
}
ipResp, err := client.InitProducerID(ctx, &InitProducerIDRequest{
TransactionalID: transactionalID,
TransactionTimeoutMs: 10000,
})
if err != nil {
t.Fatal(err)
}
if ipResp.Error != nil {
t.Fatal(ipResp.Error)
}
defer func() {
err := clientEndTxn(client, &EndTxnRequest{
TransactionalID: transactionalID,
ProducerID: ipResp.Producer.ProducerID,
ProducerEpoch: ipResp.Producer.ProducerEpoch,
Committed: false,
})
if err != nil {
t.Fatal(err)
}
}()
ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
resp, err := client.AddOffsetsToTxn(ctx, &AddOffsetsToTxnRequest{
TransactionalID: transactionalID,
ProducerID: ipResp.Producer.ProducerID,
ProducerEpoch: ipResp.Producer.ProducerEpoch,
GroupID: groupID,
})
if err != nil {
t.Fatal(err)
}
if resp.Error != nil {
t.Fatal(err)
}
}
|