1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
package kafka
import (
"context"
"errors"
"net"
"strconv"
"testing"
"time"
ktesting "github.com/segmentio/kafka-go/testing"
)
func TestClientInitProducerId(t *testing.T) {
if !ktesting.KafkaIsAtLeast("0.11.0") {
return
}
// TODO: look into why this test fails on Kafka 3.0.0 and higher when transactional support
// work is revisited.
if ktesting.KafkaIsAtLeast("3.0.0") {
t.Skip("Skipping test because it fails on Kafka version 3.0.0 or higher.")
}
client, shutdown := newLocalClient()
defer shutdown()
tid := makeTransactionalID()
// Wait for kafka setup and Coordinator to be available.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
respc, err := waitForCoordinatorIndefinitely(ctx, client, &FindCoordinatorRequest{
Addr: client.Addr,
Key: tid,
KeyType: CoordinatorKeyTypeTransaction,
})
if err != nil {
t.Fatal(err)
}
// Now establish a connection with the transaction coordinator
transactionCoordinator := TCP(net.JoinHostPort(respc.Coordinator.Host, strconv.Itoa(int(respc.Coordinator.Port))))
client, shutdown = newClient(transactionCoordinator)
defer shutdown()
// Check if producer epoch increases and PID remains the same when producer is
// initialized again with the same transactionalID
resp, err := client.InitProducerID(context.Background(), &InitProducerIDRequest{
Addr: transactionCoordinator,
TransactionalID: tid,
TransactionTimeoutMs: 30000,
})
if err != nil {
t.Fatal(err)
}
if resp.Error != nil {
t.Fatal(resp.Error)
}
epoch1 := resp.Producer.ProducerEpoch
pid1 := resp.Producer.ProducerID
resp, err = client.InitProducerID(context.Background(), &InitProducerIDRequest{
Addr: transactionCoordinator,
TransactionalID: tid,
TransactionTimeoutMs: 30000,
ProducerID: pid1,
ProducerEpoch: epoch1,
})
if err != nil {
t.Fatal(err)
}
if resp.Error != nil {
t.Fatal(resp.Error)
}
epoch2 := resp.Producer.ProducerEpoch
pid2 := resp.Producer.ProducerID
if pid1 != pid2 {
t.Fatalf("PID should stay the same across producer sessions; expected: %v got: %v", pid1, pid2)
}
if epoch2-epoch1 <= 0 {
t.Fatal("Epoch should increase when producer is initialized again with the same transactionID")
}
// Checks if transaction timeout is too high
// Transaction timeout should never be higher than broker config `transaction.max.timeout.ms`
resp, _ = client.InitProducerID(context.Background(), &InitProducerIDRequest{
Addr: client.Addr,
TransactionalID: tid,
TransactionTimeoutMs: 30000000,
})
if !errors.Is(resp.Error, InvalidTransactionTimeout) {
t.Fatal("Should have errored with: Transaction timeout specified is higher than `transaction.max.timeout.ms`")
}
}
|