1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
package kafka
import (
"context"
"net"
"github.com/segmentio/kafka-go/protocol/incrementalalterconfigs"
)
type ConfigOperation int8
const (
ConfigOperationSet ConfigOperation = 0
ConfigOperationDelete ConfigOperation = 1
ConfigOperationAppend ConfigOperation = 2
ConfigOperationSubtract ConfigOperation = 3
)
// IncrementalAlterConfigsRequest is a request to the IncrementalAlterConfigs API.
type IncrementalAlterConfigsRequest struct {
// Addr is the address of the kafka broker to send the request to.
Addr net.Addr
// Resources contains the list of resources to update configs for.
Resources []IncrementalAlterConfigsRequestResource
// ValidateOnly indicates whether Kafka should validate the changes without actually
// applying them.
ValidateOnly bool
}
// IncrementalAlterConfigsRequestResource contains the details of a single resource type whose
// configs should be altered.
type IncrementalAlterConfigsRequestResource struct {
// ResourceType is the type of resource to update.
ResourceType ResourceType
// ResourceName is the name of the resource to update (i.e., topic name or broker ID).
ResourceName string
// Configs contains the list of config key/values to update.
Configs []IncrementalAlterConfigsRequestConfig
}
// IncrementalAlterConfigsRequestConfig describes a single config key/value pair that should
// be altered.
type IncrementalAlterConfigsRequestConfig struct {
// Name is the name of the config.
Name string
// Value is the value to set for this config.
Value string
// ConfigOperation indicates how this config should be updated (e.g., add, delete, etc.).
ConfigOperation ConfigOperation
}
// IncrementalAlterConfigsResponse is a response from the IncrementalAlterConfigs API.
type IncrementalAlterConfigsResponse struct {
// Resources contains details of each resource config that was updated.
Resources []IncrementalAlterConfigsResponseResource
}
// IncrementalAlterConfigsResponseResource contains the response details for a single resource
// whose configs were updated.
type IncrementalAlterConfigsResponseResource struct {
// Error is set to a non-nil value if an error occurred while updating this specific
// config.
Error error
// ResourceType is the type of resource that was updated.
ResourceType ResourceType
// ResourceName is the name of the resource that was updated.
ResourceName string
}
func (c *Client) IncrementalAlterConfigs(
ctx context.Context,
req *IncrementalAlterConfigsRequest,
) (*IncrementalAlterConfigsResponse, error) {
apiReq := &incrementalalterconfigs.Request{
ValidateOnly: req.ValidateOnly,
}
for _, res := range req.Resources {
apiRes := incrementalalterconfigs.RequestResource{
ResourceType: int8(res.ResourceType),
ResourceName: res.ResourceName,
}
for _, config := range res.Configs {
apiRes.Configs = append(
apiRes.Configs,
incrementalalterconfigs.RequestConfig{
Name: config.Name,
Value: config.Value,
ConfigOperation: int8(config.ConfigOperation),
},
)
}
apiReq.Resources = append(
apiReq.Resources,
apiRes,
)
}
protoResp, err := c.roundTrip(
ctx,
req.Addr,
apiReq,
)
if err != nil {
return nil, err
}
resp := &IncrementalAlterConfigsResponse{}
apiResp := protoResp.(*incrementalalterconfigs.Response)
for _, res := range apiResp.Responses {
resp.Resources = append(
resp.Resources,
IncrementalAlterConfigsResponseResource{
Error: makeError(res.ErrorCode, res.ErrorMessage),
ResourceType: ResourceType(res.ResourceType),
ResourceName: res.ResourceName,
},
)
}
return resp, nil
}
|