1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
package kafka
import (
"context"
"fmt"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/describeconfigs"
)
// DescribeConfigsRequest represents a request sent to a kafka broker to describe configs.
type DescribeConfigsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// List of resources to get details for.
Resources []DescribeConfigRequestResource
// Ignored if API version is less than v1
IncludeSynonyms bool
// Ignored if API version is less than v3
IncludeDocumentation bool
}
type DescribeConfigRequestResource struct {
// Resource Type
ResourceType ResourceType
// Resource Name
ResourceName string
// ConfigNames is a list of configurations to update.
ConfigNames []string
}
// DescribeConfigsResponse represents a response from a kafka broker to a describe config request.
type DescribeConfigsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration
// Resources
Resources []DescribeConfigResponseResource
}
// DescribeConfigResponseResource.
type DescribeConfigResponseResource struct {
// Resource Type
ResourceType int8
// Resource Name
ResourceName string
// Error
Error error
// ConfigEntries
ConfigEntries []DescribeConfigResponseConfigEntry
}
// DescribeConfigResponseConfigEntry.
type DescribeConfigResponseConfigEntry struct {
ConfigName string
ConfigValue string
ReadOnly bool
// Ignored if API version is greater than v0
IsDefault bool
// Ignored if API version is less than v1
ConfigSource int8
IsSensitive bool
// Ignored if API version is less than v1
ConfigSynonyms []DescribeConfigResponseConfigSynonym
// Ignored if API version is less than v3
ConfigType int8
// Ignored if API version is less than v3
ConfigDocumentation string
}
// DescribeConfigResponseConfigSynonym.
type DescribeConfigResponseConfigSynonym struct {
// Ignored if API version is less than v1
ConfigName string
// Ignored if API version is less than v1
ConfigValue string
// Ignored if API version is less than v1
ConfigSource int8
}
// DescribeConfigs sends a config altering request to a kafka broker and returns the
// response.
func (c *Client) DescribeConfigs(ctx context.Context, req *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
resources := make([]describeconfigs.RequestResource, len(req.Resources))
for i, t := range req.Resources {
resources[i] = describeconfigs.RequestResource{
ResourceType: int8(t.ResourceType),
ResourceName: t.ResourceName,
ConfigNames: t.ConfigNames,
}
}
m, err := c.roundTrip(ctx, req.Addr, &describeconfigs.Request{
Resources: resources,
IncludeSynonyms: req.IncludeSynonyms,
IncludeDocumentation: req.IncludeDocumentation,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).DescribeConfigs: %w", err)
}
res := m.(*describeconfigs.Response)
ret := &DescribeConfigsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Resources: make([]DescribeConfigResponseResource, len(res.Resources)),
}
for i, t := range res.Resources {
configEntries := make([]DescribeConfigResponseConfigEntry, len(t.ConfigEntries))
for j, v := range t.ConfigEntries {
configSynonyms := make([]DescribeConfigResponseConfigSynonym, len(v.ConfigSynonyms))
for k, cs := range v.ConfigSynonyms {
configSynonyms[k] = DescribeConfigResponseConfigSynonym{
ConfigName: cs.ConfigName,
ConfigValue: cs.ConfigValue,
ConfigSource: cs.ConfigSource,
}
}
configEntries[j] = DescribeConfigResponseConfigEntry{
ConfigName: v.ConfigName,
ConfigValue: v.ConfigValue,
ReadOnly: v.ReadOnly,
ConfigSource: v.ConfigSource,
IsDefault: v.IsDefault,
IsSensitive: v.IsSensitive,
ConfigSynonyms: configSynonyms,
ConfigType: v.ConfigType,
ConfigDocumentation: v.ConfigDocumentation,
}
}
ret.Resources[i] = DescribeConfigResponseResource{
ResourceType: t.ResourceType,
ResourceName: t.ResourceName,
Error: makeError(t.ErrorCode, t.ErrorMessage),
ConfigEntries: configEntries,
}
}
return ret, nil
}
|