1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
package kafka
import (
"context"
"fmt"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/alterclientquotas"
)
// AlterClientQuotasRequest represents a request sent to a kafka broker to
// alter client quotas.
type AlterClientQuotasRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// List of client quotas entries to alter.
Entries []AlterClientQuotaEntry
// Whether the alteration should be validated, but not performed.
ValidateOnly bool
}
type AlterClientQuotaEntry struct {
// The quota entities to alter.
Entities []AlterClientQuotaEntity
// An individual quota configuration entry to alter.
Ops []AlterClientQuotaOps
}
type AlterClientQuotaEntity struct {
// The quota entity type.
EntityType string
// The name of the quota entity, or null if the default.
EntityName string
}
type AlterClientQuotaOps struct {
// The quota configuration key.
Key string
// The quota configuration value to set, otherwise ignored if the value is to be removed.
Value float64
// Whether the quota configuration value should be removed, otherwise set.
Remove bool
}
type AlterClientQuotaResponseQuotas struct {
// Error is set to a non-nil value including the code and message if a top-level
// error was encountered when doing the update.
Error error
// The altered quota entities.
Entities []AlterClientQuotaEntity
}
// AlterClientQuotasResponse represents a response from a kafka broker to an alter client
// quotas request.
type AlterClientQuotasResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration
// List of altered client quotas responses.
Entries []AlterClientQuotaResponseQuotas
}
// AlterClientQuotas sends client quotas alteration request to a kafka broker and returns
// the response.
func (c *Client) AlterClientQuotas(ctx context.Context, req *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) {
entries := make([]alterclientquotas.Entry, len(req.Entries))
for entryIdx, entry := range req.Entries {
entities := make([]alterclientquotas.Entity, len(entry.Entities))
for entityIdx, entity := range entry.Entities {
entities[entityIdx] = alterclientquotas.Entity{
EntityType: entity.EntityType,
EntityName: entity.EntityName,
}
}
ops := make([]alterclientquotas.Ops, len(entry.Ops))
for opsIdx, op := range entry.Ops {
ops[opsIdx] = alterclientquotas.Ops{
Key: op.Key,
Value: op.Value,
Remove: op.Remove,
}
}
entries[entryIdx] = alterclientquotas.Entry{
Entities: entities,
Ops: ops,
}
}
m, err := c.roundTrip(ctx, req.Addr, &alterclientquotas.Request{
Entries: entries,
ValidateOnly: req.ValidateOnly,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).AlterClientQuotas: %w", err)
}
res := m.(*alterclientquotas.Response)
responseEntries := make([]AlterClientQuotaResponseQuotas, len(res.Results))
for responseEntryIdx, responseEntry := range res.Results {
responseEntities := make([]AlterClientQuotaEntity, len(responseEntry.Entities))
for responseEntityIdx, responseEntity := range responseEntry.Entities {
responseEntities[responseEntityIdx] = AlterClientQuotaEntity{
EntityType: responseEntity.EntityType,
EntityName: responseEntity.EntityName,
}
}
responseEntries[responseEntryIdx] = AlterClientQuotaResponseQuotas{
Error: makeError(responseEntry.ErrorCode, responseEntry.ErrorMessage),
Entities: responseEntities,
}
}
ret := &AlterClientQuotasResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Entries: responseEntries,
}
return ret, nil
}
|