File: describeacls.go

package info (click to toggle)
golang-github-segmentio-kafka-go 0.4.49%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 2,292 kB
  • sloc: sh: 17; makefile: 10
file content (107 lines) | stat: -rw-r--r-- 3,114 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package kafka

import (
	"context"
	"fmt"
	"net"
	"time"

	"github.com/segmentio/kafka-go/protocol/describeacls"
)

// DescribeACLsRequest represents a request sent to a kafka broker to describe
// existing ACLs.
type DescribeACLsRequest struct {
	// Address of the kafka broker to send the request to.
	Addr net.Addr

	// Filter to filter ACLs on.
	Filter ACLFilter
}

type ACLFilter struct {
	ResourceTypeFilter ResourceType
	ResourceNameFilter string
	// ResourcePatternTypeFilter was added in v1 and is not available prior to that.
	ResourcePatternTypeFilter PatternType
	PrincipalFilter           string
	HostFilter                string
	Operation                 ACLOperationType
	PermissionType            ACLPermissionType
}

// DescribeACLsResponse represents a response from a kafka broker to an ACL
// describe request.
type DescribeACLsResponse struct {
	// The amount of time that the broker throttled the request.
	Throttle time.Duration

	// Error that occurred while attempting to describe
	// the ACLs.
	Error error

	// ACL resources returned from the describe request.
	Resources []ACLResource
}

type ACLResource struct {
	ResourceType ResourceType
	ResourceName string
	PatternType  PatternType
	ACLs         []ACLDescription
}

type ACLDescription struct {
	Principal      string
	Host           string
	Operation      ACLOperationType
	PermissionType ACLPermissionType
}

func (c *Client) DescribeACLs(ctx context.Context, req *DescribeACLsRequest) (*DescribeACLsResponse, error) {
	m, err := c.roundTrip(ctx, req.Addr, &describeacls.Request{
		Filter: describeacls.ACLFilter{
			ResourceTypeFilter:        int8(req.Filter.ResourceTypeFilter),
			ResourceNameFilter:        req.Filter.ResourceNameFilter,
			ResourcePatternTypeFilter: int8(req.Filter.ResourcePatternTypeFilter),
			PrincipalFilter:           req.Filter.PrincipalFilter,
			HostFilter:                req.Filter.HostFilter,
			Operation:                 int8(req.Filter.Operation),
			PermissionType:            int8(req.Filter.PermissionType),
		},
	})
	if err != nil {
		return nil, fmt.Errorf("kafka.(*Client).DescribeACLs: %w", err)
	}

	res := m.(*describeacls.Response)
	resources := make([]ACLResource, len(res.Resources))

	for resourceIdx, respResource := range res.Resources {
		descriptions := make([]ACLDescription, len(respResource.ACLs))

		for descriptionIdx, respDescription := range respResource.ACLs {
			descriptions[descriptionIdx] = ACLDescription{
				Principal:      respDescription.Principal,
				Host:           respDescription.Host,
				Operation:      ACLOperationType(respDescription.Operation),
				PermissionType: ACLPermissionType(respDescription.PermissionType),
			}
		}

		resources[resourceIdx] = ACLResource{
			ResourceType: ResourceType(respResource.ResourceType),
			ResourceName: respResource.ResourceName,
			PatternType:  PatternType(respResource.PatternType),
			ACLs:         descriptions,
		}
	}

	ret := &DescribeACLsResponse{
		Throttle:  makeDuration(res.ThrottleTimeMs),
		Error:     makeError(res.ErrorCode, res.ErrorMessage),
		Resources: resources,
	}

	return ret, nil
}