1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
package kafka
import (
"context"
"fmt"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/describeacls"
)
// DescribeACLsRequest represents a request sent to a kafka broker to describe
// existing ACLs.
type DescribeACLsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// Filter to filter ACLs on.
Filter ACLFilter
}
type ACLFilter struct {
ResourceTypeFilter ResourceType
ResourceNameFilter string
// ResourcePatternTypeFilter was added in v1 and is not available prior to that.
ResourcePatternTypeFilter PatternType
PrincipalFilter string
HostFilter string
Operation ACLOperationType
PermissionType ACLPermissionType
}
// DescribeACLsResponse represents a response from a kafka broker to an ACL
// describe request.
type DescribeACLsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration
// Error that occurred while attempting to describe
// the ACLs.
Error error
// ACL resources returned from the describe request.
Resources []ACLResource
}
type ACLResource struct {
ResourceType ResourceType
ResourceName string
PatternType PatternType
ACLs []ACLDescription
}
type ACLDescription struct {
Principal string
Host string
Operation ACLOperationType
PermissionType ACLPermissionType
}
func (c *Client) DescribeACLs(ctx context.Context, req *DescribeACLsRequest) (*DescribeACLsResponse, error) {
m, err := c.roundTrip(ctx, req.Addr, &describeacls.Request{
Filter: describeacls.ACLFilter{
ResourceTypeFilter: int8(req.Filter.ResourceTypeFilter),
ResourceNameFilter: req.Filter.ResourceNameFilter,
ResourcePatternTypeFilter: int8(req.Filter.ResourcePatternTypeFilter),
PrincipalFilter: req.Filter.PrincipalFilter,
HostFilter: req.Filter.HostFilter,
Operation: int8(req.Filter.Operation),
PermissionType: int8(req.Filter.PermissionType),
},
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).DescribeACLs: %w", err)
}
res := m.(*describeacls.Response)
resources := make([]ACLResource, len(res.Resources))
for resourceIdx, respResource := range res.Resources {
descriptions := make([]ACLDescription, len(respResource.ACLs))
for descriptionIdx, respDescription := range respResource.ACLs {
descriptions[descriptionIdx] = ACLDescription{
Principal: respDescription.Principal,
Host: respDescription.Host,
Operation: ACLOperationType(respDescription.Operation),
PermissionType: ACLPermissionType(respDescription.PermissionType),
}
}
resources[resourceIdx] = ACLResource{
ResourceType: ResourceType(respResource.ResourceType),
ResourceName: respResource.ResourceName,
PatternType: PatternType(respResource.PatternType),
ACLs: descriptions,
}
}
ret := &DescribeACLsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Error: makeError(res.ErrorCode, res.ErrorMessage),
Resources: resources,
}
return ret, nil
}
|