1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
package kafka
import (
"context"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/listpartitionreassignments"
)
// ListPartitionReassignmentsRequest is a request to the ListPartitionReassignments API.
type ListPartitionReassignmentsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// Topics we want reassignments for, mapped by their name, or nil to list everything.
Topics map[string]ListPartitionReassignmentsRequestTopic
// Timeout is the amount of time to wait for the request to complete.
Timeout time.Duration
}
// ListPartitionReassignmentsRequestTopic contains the requested partitions for a single
// topic.
type ListPartitionReassignmentsRequestTopic struct {
// The partitions to list partition reassignments for.
PartitionIndexes []int
}
// ListPartitionReassignmentsResponse is a response from the ListPartitionReassignments API.
type ListPartitionReassignmentsResponse struct {
// Error is set to a non-nil value including the code and message if a top-level
// error was encountered.
Error error
// Topics contains results for each topic, mapped by their name.
Topics map[string]ListPartitionReassignmentsResponseTopic
}
// ListPartitionReassignmentsResponseTopic contains the detailed result of
// ongoing reassignments for a topic.
type ListPartitionReassignmentsResponseTopic struct {
// Partitions contains result for topic partitions.
Partitions []ListPartitionReassignmentsResponsePartition
}
// ListPartitionReassignmentsResponsePartition contains the detailed result of
// ongoing reassignments for a single partition.
type ListPartitionReassignmentsResponsePartition struct {
// PartitionIndex contains index of the partition.
PartitionIndex int
// Replicas contains the current replica set.
Replicas []int
// AddingReplicas contains the set of replicas we are currently adding.
AddingReplicas []int
// RemovingReplicas contains the set of replicas we are currently removing.
RemovingReplicas []int
}
func (c *Client) ListPartitionReassignments(
ctx context.Context,
req *ListPartitionReassignmentsRequest,
) (*ListPartitionReassignmentsResponse, error) {
apiReq := &listpartitionreassignments.Request{
TimeoutMs: int32(req.Timeout.Milliseconds()),
}
for topicName, topicReq := range req.Topics {
apiReq.Topics = append(
apiReq.Topics,
listpartitionreassignments.RequestTopic{
Name: topicName,
PartitionIndexes: intToInt32Array(topicReq.PartitionIndexes),
},
)
}
protoResp, err := c.roundTrip(
ctx,
req.Addr,
apiReq,
)
if err != nil {
return nil, err
}
apiResp := protoResp.(*listpartitionreassignments.Response)
resp := &ListPartitionReassignmentsResponse{
Error: makeError(apiResp.ErrorCode, apiResp.ErrorMessage),
Topics: make(map[string]ListPartitionReassignmentsResponseTopic),
}
for _, topicResult := range apiResp.Topics {
respTopic := ListPartitionReassignmentsResponseTopic{}
for _, partitionResult := range topicResult.Partitions {
respTopic.Partitions = append(
respTopic.Partitions,
ListPartitionReassignmentsResponsePartition{
PartitionIndex: int(partitionResult.PartitionIndex),
Replicas: int32ToIntArray(partitionResult.Replicas),
AddingReplicas: int32ToIntArray(partitionResult.AddingReplicas),
RemovingReplicas: int32ToIntArray(partitionResult.RemovingReplicas),
},
)
}
resp.Topics[topicResult.Name] = respTopic
}
return resp, nil
}
func intToInt32Array(arr []int) []int32 {
if arr == nil {
return nil
}
res := make([]int32, len(arr))
for i := range arr {
res[i] = int32(arr[i])
}
return res
}
func int32ToIntArray(arr []int32) []int {
if arr == nil {
return nil
}
res := make([]int, len(arr))
for i := range arr {
res[i] = int(arr[i])
}
return res
}
|