1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
|
package kafka
import (
"bufio"
"context"
"fmt"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/findcoordinator"
)
// CoordinatorKeyType is used to specify the type of coordinator to look for.
type CoordinatorKeyType int8
const (
// CoordinatorKeyTypeConsumer type is used when looking for a Group coordinator.
CoordinatorKeyTypeConsumer CoordinatorKeyType = 0
// CoordinatorKeyTypeTransaction type is used when looking for a Transaction coordinator.
CoordinatorKeyTypeTransaction CoordinatorKeyType = 1
)
// FindCoordinatorRequest is the request structure for the FindCoordinator function.
type FindCoordinatorRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// The coordinator key.
Key string
// The coordinator key type. (Group, transaction, etc.)
KeyType CoordinatorKeyType
}
// FindCoordinatorResponseCoordinator contains details about the found coordinator.
type FindCoordinatorResponseCoordinator struct {
// NodeID holds the broker id.
NodeID int
// Host of the broker
Host string
// Port on which broker accepts requests
Port int
}
// FindCoordinatorResponse is the response structure for the FindCoordinator function.
type FindCoordinatorResponse struct {
// The Transaction/Group Coordinator details
Coordinator *FindCoordinatorResponseCoordinator
// The amount of time that the broker throttled the request.
Throttle time.Duration
// An error that may have occurred while attempting to retrieve Coordinator
//
// The error contains both the kafka error code, and an error message
// returned by the kafka broker.
Error error
}
// FindCoordinator sends a findCoordinator request to a kafka broker and returns the
// response.
func (c *Client) FindCoordinator(ctx context.Context, req *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
m, err := c.roundTrip(ctx, req.Addr, &findcoordinator.Request{
Key: req.Key,
KeyType: int8(req.KeyType),
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).FindCoordinator: %w", err)
}
res := m.(*findcoordinator.Response)
coordinator := &FindCoordinatorResponseCoordinator{
NodeID: int(res.NodeID),
Host: res.Host,
Port: int(res.Port),
}
ret := &FindCoordinatorResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Error: makeError(res.ErrorCode, res.ErrorMessage),
Coordinator: coordinator,
}
return ret, nil
}
// FindCoordinatorRequestV0 requests the coordinator for the specified group or transaction
//
// See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator
type findCoordinatorRequestV0 struct {
// CoordinatorKey holds id to use for finding the coordinator (for groups, this is
// the groupId, for transactional producers, this is the transactional id)
CoordinatorKey string
}
func (t findCoordinatorRequestV0) size() int32 {
return sizeofString(t.CoordinatorKey)
}
func (t findCoordinatorRequestV0) writeTo(wb *writeBuffer) {
wb.writeString(t.CoordinatorKey)
}
type findCoordinatorResponseCoordinatorV0 struct {
// NodeID holds the broker id.
NodeID int32
// Host of the broker
Host string
// Port on which broker accepts requests
Port int32
}
func (t findCoordinatorResponseCoordinatorV0) size() int32 {
return sizeofInt32(t.NodeID) +
sizeofString(t.Host) +
sizeofInt32(t.Port)
}
func (t findCoordinatorResponseCoordinatorV0) writeTo(wb *writeBuffer) {
wb.writeInt32(t.NodeID)
wb.writeString(t.Host)
wb.writeInt32(t.Port)
}
func (t *findCoordinatorResponseCoordinatorV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readInt32(r, size, &t.NodeID); err != nil {
return
}
if remain, err = readString(r, remain, &t.Host); err != nil {
return
}
if remain, err = readInt32(r, remain, &t.Port); err != nil {
return
}
return
}
type findCoordinatorResponseV0 struct {
// ErrorCode holds response error code
ErrorCode int16
// Coordinator holds host and port information for the coordinator
Coordinator findCoordinatorResponseCoordinatorV0
}
func (t findCoordinatorResponseV0) size() int32 {
return sizeofInt16(t.ErrorCode) +
t.Coordinator.size()
}
func (t findCoordinatorResponseV0) writeTo(wb *writeBuffer) {
wb.writeInt16(t.ErrorCode)
t.Coordinator.writeTo(wb)
}
func (t *findCoordinatorResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
return
}
if remain, err = (&t.Coordinator).readFrom(r, remain); err != nil {
return
}
return
}
|