1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
package kafka
import (
"context"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/alterpartitionreassignments"
)
// AlterPartitionReassignmentsRequest is a request to the AlterPartitionReassignments API.
type AlterPartitionReassignmentsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// Topic is the name of the topic to alter partitions in. Keep this field empty and use Topic in AlterPartitionReassignmentsRequestAssignment to
// reassign to multiple topics.
Topic string
// Assignments is the list of partition reassignments to submit to the API.
Assignments []AlterPartitionReassignmentsRequestAssignment
// Timeout is the amount of time to wait for the request to complete.
Timeout time.Duration
}
// AlterPartitionReassignmentsRequestAssignment contains the requested reassignments for a single
// partition.
type AlterPartitionReassignmentsRequestAssignment struct {
// Topic is the name of the topic to alter partitions in. If empty, the value of Topic in AlterPartitionReassignmentsRequest is used.
Topic string
// PartitionID is the ID of the partition to make the reassignments in.
PartitionID int
// BrokerIDs is a slice of brokers to set the partition replicas to, or null to cancel a pending reassignment for this partition.
BrokerIDs []int
}
// AlterPartitionReassignmentsResponse is a response from the AlterPartitionReassignments API.
type AlterPartitionReassignmentsResponse struct {
// Error is set to a non-nil value including the code and message if a top-level
// error was encountered when doing the update.
Error error
// PartitionResults contains the specific results for each partition.
PartitionResults []AlterPartitionReassignmentsResponsePartitionResult
}
// AlterPartitionReassignmentsResponsePartitionResult contains the detailed result of
// doing reassignments for a single partition.
type AlterPartitionReassignmentsResponsePartitionResult struct {
// Topic is the topic name.
Topic string
// PartitionID is the ID of the partition that was altered.
PartitionID int
// Error is set to a non-nil value including the code and message if an error was encountered
// during the update for this partition.
Error error
}
func (c *Client) AlterPartitionReassignments(
ctx context.Context,
req *AlterPartitionReassignmentsRequest,
) (*AlterPartitionReassignmentsResponse, error) {
apiTopicMap := make(map[string]*alterpartitionreassignments.RequestTopic)
for _, assignment := range req.Assignments {
topic := assignment.Topic
if topic == "" {
topic = req.Topic
}
apiTopic := apiTopicMap[topic]
if apiTopic == nil {
apiTopic = &alterpartitionreassignments.RequestTopic{
Name: topic,
}
apiTopicMap[topic] = apiTopic
}
replicas := []int32{}
for _, brokerID := range assignment.BrokerIDs {
replicas = append(replicas, int32(brokerID))
}
apiTopic.Partitions = append(
apiTopic.Partitions,
alterpartitionreassignments.RequestPartition{
PartitionIndex: int32(assignment.PartitionID),
Replicas: replicas,
},
)
}
apiReq := &alterpartitionreassignments.Request{
TimeoutMs: int32(req.Timeout.Milliseconds()),
}
for _, apiTopic := range apiTopicMap {
apiReq.Topics = append(apiReq.Topics, *apiTopic)
}
protoResp, err := c.roundTrip(
ctx,
req.Addr,
apiReq,
)
if err != nil {
return nil, err
}
apiResp := protoResp.(*alterpartitionreassignments.Response)
resp := &AlterPartitionReassignmentsResponse{
Error: makeError(apiResp.ErrorCode, apiResp.ErrorMessage),
}
for _, topicResult := range apiResp.Results {
for _, partitionResult := range topicResult.Partitions {
resp.PartitionResults = append(
resp.PartitionResults,
AlterPartitionReassignmentsResponsePartitionResult{
Topic: topicResult.Name,
PartitionID: int(partitionResult.PartitionIndex),
Error: makeError(partitionResult.ErrorCode, partitionResult.ErrorMessage),
},
)
}
}
return resp, nil
}
|