1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
|
package kafka
import (
"context"
"fmt"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/txnoffsetcommit"
)
// TxnOffsetCommitRequest represents a request sent to a kafka broker to commit
// offsets for a partition within a transaction.
type TxnOffsetCommitRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// The transactional id key.
TransactionalID string
// ID of the consumer group to publish the offsets for.
GroupID string
// The Producer ID (PID) for the current producer session;
// received from an InitProducerID request.
ProducerID int
// The epoch associated with the current producer session for the given PID
ProducerEpoch int
// GenerationID is the current generation for the group.
GenerationID int
// ID of the group member submitting the offsets.
MemberID string
// GroupInstanceID is a unique identifier for the consumer.
GroupInstanceID string
// Set of topic partitions to publish the offsets for.
//
// Not that offset commits need to be submitted to the broker acting as the
// group coordinator. This will be automatically resolved by the transport.
Topics map[string][]TxnOffsetCommit
}
// TxnOffsetCommit represent the commit of an offset to a partition within a transaction.
//
// The extra metadata is opaque to the kafka protocol, it is intended to hold
// information like an identifier for the process that committed the offset,
// or the time at which the commit was made.
type TxnOffsetCommit struct {
Partition int
Offset int64
Metadata string
}
// TxnOffsetFetchResponse represents a response from a kafka broker to an offset
// commit request within a transaction.
type TxnOffsetCommitResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration
// Set of topic partitions that the kafka broker has accepted offset commits
// for.
Topics map[string][]TxnOffsetCommitPartition
}
// TxnOffsetFetchPartition represents the state of a single partition in responses
// to committing offsets within a transaction.
type TxnOffsetCommitPartition struct {
// ID of the partition.
Partition int
// An error that may have occurred while attempting to publish consumer
// group offsets for this partition.
//
// The error contains both the kafka error code, and an error message
// returned by the kafka broker. Programs may use the standard errors.Is
// function to test the error against kafka error codes.
Error error
}
// TxnOffsetCommit sends an txn offset commit request to a kafka broker and returns the
// response.
func (c *Client) TxnOffsetCommit(
ctx context.Context,
req *TxnOffsetCommitRequest,
) (*TxnOffsetCommitResponse, error) {
protoReq := &txnoffsetcommit.Request{
TransactionalID: req.TransactionalID,
GroupID: req.GroupID,
ProducerID: int64(req.ProducerID),
ProducerEpoch: int16(req.ProducerEpoch),
GenerationID: int32(req.GenerationID),
MemberID: req.MemberID,
GroupInstanceID: req.GroupInstanceID,
Topics: make([]txnoffsetcommit.RequestTopic, 0, len(req.Topics)),
}
for topic, partitions := range req.Topics {
parts := make([]txnoffsetcommit.RequestPartition, len(partitions))
for i, partition := range partitions {
parts[i] = txnoffsetcommit.RequestPartition{
Partition: int32(partition.Partition),
CommittedOffset: int64(partition.Offset),
CommittedMetadata: partition.Metadata,
}
}
t := txnoffsetcommit.RequestTopic{
Name: topic,
Partitions: parts,
}
protoReq.Topics = append(protoReq.Topics, t)
}
m, err := c.roundTrip(ctx, req.Addr, protoReq)
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).TxnOffsetCommit: %w", err)
}
r := m.(*txnoffsetcommit.Response)
res := &TxnOffsetCommitResponse{
Throttle: makeDuration(r.ThrottleTimeMs),
Topics: make(map[string][]TxnOffsetCommitPartition, len(r.Topics)),
}
for _, topic := range r.Topics {
partitions := make([]TxnOffsetCommitPartition, 0, len(topic.Partitions))
for _, partition := range topic.Partitions {
partitions = append(partitions, TxnOffsetCommitPartition{
Partition: int(partition.Partition),
Error: makeError(partition.ErrorCode, ""),
})
}
res.Topics[topic.Name] = partitions
}
return res, nil
}
|