1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
package describeconfigs
import (
"strconv"
"github.com/segmentio/kafka-go/protocol"
)
const (
resourceTypeBroker int8 = 4
)
func init() {
protocol.Register(&Request{}, &Response{})
}
// Detailed API definition: https://kafka.apache.org/protocol#The_Messages_DescribeConfigs
type Request struct {
Resources []RequestResource `kafka:"min=v0,max=v3"`
IncludeSynonyms bool `kafka:"min=v1,max=v3"`
IncludeDocumentation bool `kafka:"min=v3,max=v3"`
}
func (r *Request) ApiKey() protocol.ApiKey { return protocol.DescribeConfigs }
func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
// Broker metadata requests must be sent to the associated broker
for _, resource := range r.Resources {
if resource.ResourceType == resourceTypeBroker {
brokerID, err := strconv.Atoi(resource.ResourceName)
if err != nil {
return protocol.Broker{}, err
}
return cluster.Brokers[int32(brokerID)], nil
}
}
return cluster.Brokers[cluster.Controller], nil
}
func (r *Request) Split(cluster protocol.Cluster) (
[]protocol.Message,
protocol.Merger,
error,
) {
messages := []protocol.Message{}
topicsMessage := Request{}
for _, resource := range r.Resources {
// Split out broker requests to separate brokers
if resource.ResourceType == resourceTypeBroker {
messages = append(messages, &Request{
Resources: []RequestResource{resource},
})
} else {
topicsMessage.Resources = append(
topicsMessage.Resources, resource,
)
}
}
if len(topicsMessage.Resources) > 0 {
messages = append(messages, &topicsMessage)
}
return messages, new(Response), nil
}
type RequestResource struct {
ResourceType int8 `kafka:"min=v0,max=v3"`
ResourceName string `kafka:"min=v0,max=v3"`
ConfigNames []string `kafka:"min=v0,max=v3,nullable"`
}
type Response struct {
ThrottleTimeMs int32 `kafka:"min=v0,max=v3"`
Resources []ResponseResource `kafka:"min=v0,max=v3"`
}
func (r *Response) ApiKey() protocol.ApiKey { return protocol.DescribeConfigs }
func (r *Response) Merge(requests []protocol.Message, results []interface{}) (
protocol.Message,
error,
) {
response := &Response{}
for _, result := range results {
m, err := protocol.Result(result)
if err != nil {
return nil, err
}
response.Resources = append(
response.Resources,
m.(*Response).Resources...,
)
}
return response, nil
}
type ResponseResource struct {
ErrorCode int16 `kafka:"min=v0,max=v3"`
ErrorMessage string `kafka:"min=v0,max=v3,nullable"`
ResourceType int8 `kafka:"min=v0,max=v3"`
ResourceName string `kafka:"min=v0,max=v3"`
ConfigEntries []ResponseConfigEntry `kafka:"min=v0,max=v3"`
}
type ResponseConfigEntry struct {
ConfigName string `kafka:"min=v0,max=v3"`
ConfigValue string `kafka:"min=v0,max=v3,nullable"`
ReadOnly bool `kafka:"min=v0,max=v3"`
IsDefault bool `kafka:"min=v0,max=v0"`
ConfigSource int8 `kafka:"min=v1,max=v3"`
IsSensitive bool `kafka:"min=v0,max=v3"`
ConfigSynonyms []ResponseConfigSynonym `kafka:"min=v1,max=v3"`
ConfigType int8 `kafka:"min=v3,max=v3"`
ConfigDocumentation string `kafka:"min=v3,max=v3,nullable"`
}
type ResponseConfigSynonym struct {
ConfigName string `kafka:"min=v1,max=v3"`
ConfigValue string `kafka:"min=v1,max=v3,nullable"`
ConfigSource int8 `kafka:"min=v1,max=v3"`
}
var _ protocol.BrokerMessage = (*Request)(nil)
|