File: deleteacls.go

package info (click to toggle)
golang-github-segmentio-kafka-go 0.4.49%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 2,292 kB
  • sloc: sh: 17; makefile: 10
file content (114 lines) | stat: -rw-r--r-- 3,423 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package kafka

import (
	"context"
	"fmt"
	"net"
	"time"

	"github.com/segmentio/kafka-go/protocol/deleteacls"
)

// DeleteACLsRequest represents a request sent to a kafka broker to delete
// ACLs.
type DeleteACLsRequest struct {
	// Address of the kafka broker to send the request to.
	Addr net.Addr

	// List of ACL filters to use for deletion.
	Filters []DeleteACLsFilter
}

type DeleteACLsFilter struct {
	ResourceTypeFilter        ResourceType
	ResourceNameFilter        string
	ResourcePatternTypeFilter PatternType
	PrincipalFilter           string
	HostFilter                string
	Operation                 ACLOperationType
	PermissionType            ACLPermissionType
}

// DeleteACLsResponse represents a response from a kafka broker to an ACL
// deletion request.
type DeleteACLsResponse struct {
	// The amount of time that the broker throttled the request.
	Throttle time.Duration

	// List of the results from the deletion request.
	Results []DeleteACLsResult
}

type DeleteACLsResult struct {
	Error        error
	MatchingACLs []DeleteACLsMatchingACLs
}

type DeleteACLsMatchingACLs struct {
	Error               error
	ResourceType        ResourceType
	ResourceName        string
	ResourcePatternType PatternType
	Principal           string
	Host                string
	Operation           ACLOperationType
	PermissionType      ACLPermissionType
}

// DeleteACLs sends ACLs deletion request to a kafka broker and returns the
// response.
func (c *Client) DeleteACLs(ctx context.Context, req *DeleteACLsRequest) (*DeleteACLsResponse, error) {
	filters := make([]deleteacls.RequestFilter, 0, len(req.Filters))

	for _, filter := range req.Filters {
		filters = append(filters, deleteacls.RequestFilter{
			ResourceTypeFilter:        int8(filter.ResourceTypeFilter),
			ResourceNameFilter:        filter.ResourceNameFilter,
			ResourcePatternTypeFilter: int8(filter.ResourcePatternTypeFilter),
			PrincipalFilter:           filter.PrincipalFilter,
			HostFilter:                filter.HostFilter,
			Operation:                 int8(filter.Operation),
			PermissionType:            int8(filter.PermissionType),
		})
	}

	m, err := c.roundTrip(ctx, req.Addr, &deleteacls.Request{
		Filters: filters,
	})
	if err != nil {
		return nil, fmt.Errorf("kafka.(*Client).DeleteACLs: %w", err)
	}

	res := m.(*deleteacls.Response)

	results := make([]DeleteACLsResult, 0, len(res.FilterResults))

	for _, result := range res.FilterResults {
		matchingACLs := make([]DeleteACLsMatchingACLs, 0, len(result.MatchingACLs))

		for _, matchingACL := range result.MatchingACLs {
			matchingACLs = append(matchingACLs, DeleteACLsMatchingACLs{
				Error:               makeError(matchingACL.ErrorCode, matchingACL.ErrorMessage),
				ResourceType:        ResourceType(matchingACL.ResourceType),
				ResourceName:        matchingACL.ResourceName,
				ResourcePatternType: PatternType(matchingACL.ResourcePatternType),
				Principal:           matchingACL.Principal,
				Host:                matchingACL.Host,
				Operation:           ACLOperationType(matchingACL.Operation),
				PermissionType:      ACLPermissionType(matchingACL.PermissionType),
			})
		}

		results = append(results, DeleteACLsResult{
			Error:        makeError(result.ErrorCode, result.ErrorMessage),
			MatchingACLs: matchingACLs,
		})
	}

	ret := &DeleteACLsResponse{
		Throttle: makeDuration(res.ThrottleTimeMs),
		Results:  results,
	}

	return ret, nil
}