1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
|
# -*- coding: utf-8 -*-
# Copyright 2022 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import pytest
from confluent_kafka.admin import (AclBinding, AclBindingFilter, ResourceType,
ResourcePatternType, AclOperation, AclPermissionType)
from confluent_kafka.error import ConsumeError
from confluent_kafka import ConsumerGroupState, TopicCollection
from tests.common import TestUtils
topic_prefix = "test-topic"
def verify_commit_result(err, _):
assert err is not None
def consume_messages(sasl_cluster, group_id, topic, num_messages=None):
conf = {'group.id': group_id,
'session.timeout.ms': 6000,
'enable.auto.commit': False,
'on_commit': verify_commit_result,
'auto.offset.reset': 'earliest'}
consumer = sasl_cluster.consumer(conf)
consumer.subscribe([topic])
read_messages = 0
msg = None
while True:
try:
msg = consumer.poll()
if msg is None:
raise Exception('Got timeout from poll() without a timeout set: %s' % msg)
# Commit offset
consumer.commit(msg, asynchronous=False)
read_messages += 1
if num_messages is not None and read_messages == num_messages:
print('Read all the required messages: exiting')
break
except ConsumeError as e:
print('Consumer error: %s: ignoring' % str(e))
break
except Exception:
raise
consumer.close()
def get_future_key_list(arg_as_list):
return arg_as_list[0]
def get_future_key_TopicCollection(arg_as_TopicCollection):
return get_future_key_list(arg_as_TopicCollection.topic_names)
def get_future_key(*arg):
if len(arg) > 0:
arg_type = type(arg[0])
if arg_type is list:
return get_future_key_list(arg[0])
elif arg_type is TopicCollection:
return get_future_key_TopicCollection(arg[0])
return None
def perform_admin_operation_sync(operation, *arg, **kwargs):
future_key = get_future_key(*arg)
fs = operation(*arg, **kwargs)
fs = fs[future_key] if future_key else fs
return fs.result()
def create_acls(admin_client, acl_bindings):
perform_admin_operation_sync(admin_client.create_acls, acl_bindings)
time.sleep(1)
def delete_acls(admin_client, acl_binding_filters):
perform_admin_operation_sync(admin_client.delete_acls, acl_binding_filters)
time.sleep(1)
def verify_provided_describe_for_authorized_operations(
admin_client,
describe_fn,
operation_to_allow,
operation_to_check,
restype,
resname,
*arg):
kwargs = {}
kwargs['request_timeout'] = 10
# Check with include_authorized_operations as False
kwargs['include_authorized_operations'] = False
desc = perform_admin_operation_sync(describe_fn, *arg, **kwargs)
assert desc.authorized_operations is None
# Check with include_authorized_operations as True
kwargs['include_authorized_operations'] = True
desc = perform_admin_operation_sync(describe_fn, *arg, **kwargs)
assert len(desc.authorized_operations) > 0
assert operation_to_allow in desc.authorized_operations
assert operation_to_check in desc.authorized_operations
# Update Authorized Operation by creating new ACLs
acl_binding = AclBinding(restype, resname, ResourcePatternType.LITERAL,
"User:sasl_user", "*", operation_to_allow, AclPermissionType.ALLOW)
create_acls(admin_client, [acl_binding])
time.sleep(1)
# Check with updated authorized operations
desc = perform_admin_operation_sync(describe_fn, *arg, **kwargs)
assert len(desc.authorized_operations) > 0
assert operation_to_allow in desc.authorized_operations
assert operation_to_check not in desc.authorized_operations
# Delete Updated ACLs
acl_binding_filter = AclBindingFilter(restype, resname, ResourcePatternType.ANY,
None, None, AclOperation.ANY, AclPermissionType.ANY)
delete_acls(admin_client, [acl_binding_filter])
time.sleep(1)
return desc
def verify_describe_topics(admin_client, topic_name):
desc = verify_provided_describe_for_authorized_operations(admin_client,
admin_client.describe_topics,
AclOperation.READ,
AclOperation.DELETE,
ResourceType.TOPIC,
topic_name,
TopicCollection([topic_name]))
assert desc.name == topic_name
assert len(desc.partitions) == 1
assert not desc.is_internal
assert desc.partitions[0].id == 0
assert desc.partitions[0].leader is not None
assert len(desc.partitions[0].replicas) == 1
assert len(desc.partitions[0].isr) == 1
def verify_describe_groups(cluster, admin_client, topic):
# Produce some messages
p = cluster.producer()
p.produce(topic, 'Hello Python!')
p.produce(topic, key='Just a key')
p.flush()
# Consume some messages for the group
group = 'test-group'
consume_messages(cluster, group, topic, 2)
# Verify Describe Consumer Groups
desc = verify_provided_describe_for_authorized_operations(admin_client,
admin_client.describe_consumer_groups,
AclOperation.READ,
AclOperation.DELETE,
ResourceType.GROUP,
group,
[group])
assert group == desc.group_id
assert desc.is_simple_consumer_group is False
assert desc.state == ConsumerGroupState.EMPTY
# Delete group
perform_admin_operation_sync(admin_client.delete_consumer_groups, [group], request_timeout=10)
def verify_describe_cluster(admin_client):
desc = verify_provided_describe_for_authorized_operations(admin_client,
admin_client.describe_cluster,
AclOperation.ALTER,
AclOperation.ALTER_CONFIGS,
ResourceType.BROKER,
"kafka-cluster")
assert isinstance(desc.cluster_id, str)
assert len(desc.nodes) > 0
assert desc.controller is not None
@pytest.mark.parametrize('sasl_cluster', [{'broker_cnt': 1}], indirect=True)
def test_describe_operations(sasl_cluster):
admin_client = sasl_cluster.admin()
# Verify Authorized Operations in Describe Cluster
verify_describe_cluster(admin_client)
# Create Topic
topic_config = {"compression.type": "gzip"}
our_topic = sasl_cluster.create_topic_and_wait_propogation(topic_prefix,
{
"num_partitions": 1,
"config": topic_config,
"replication_factor": 1,
},
validate_only=False
)
# Verify Authorized Operations in Describe Topics
verify_describe_topics(admin_client, our_topic)
# Verify Authorized Operations in Describe Groups
# Skip this test if using group protocol `consumer`
# as there is new RPC for describe_groups() in
# group protocol `consumer` case.
if not TestUtils.use_group_protocol_consumer():
verify_describe_groups(sasl_cluster, admin_client, our_topic)
# Delete Topic
perform_admin_operation_sync(admin_client.delete_topics, [our_topic], operation_timeout=0, request_timeout=10)
|