File: test_user_scram_credentials.py

package info (click to toggle)
python-confluent-kafka 2.11.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,660 kB
  • sloc: python: 30,428; ansic: 9,487; sh: 1,477; makefile: 192
file content (136 lines) | stat: -rw-r--r-- 5,008 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# -*- coding: utf-8 -*-
# Copyright 2023 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import concurrent
import pytest

from confluent_kafka.admin import UserScramCredentialsDescription, UserScramCredentialUpsertion, \
    UserScramCredentialDeletion, ScramCredentialInfo, \
    ScramMechanism
from confluent_kafka.error import KafkaException, KafkaError

from tests.common import TestUtils


def test_user_scram_credentials(kafka_cluster):
    """
    Tests for the alter and describe SASL/SCRAM credential operations.
    """

    admin_client = kafka_cluster.admin()

    newuser = "non-existent"
    mechanism = ScramMechanism.SCRAM_SHA_256
    iterations = 10000
    password = b"password"
    salt = b"salt"

    futmap = admin_client.describe_user_scram_credentials([newuser])
    assert isinstance(futmap, dict)
    assert len(futmap) == 1
    assert newuser in futmap
    fut = futmap[newuser]
    with pytest.raises(KafkaException) as ex:
        result = fut.result()
    assert ex.value.args[0] == KafkaError.RESOURCE_NOT_FOUND

    # Insert new user with SCRAM_SHA_256: 10000
    futmap = admin_client.alter_user_scram_credentials([UserScramCredentialUpsertion(newuser,
                                                        ScramCredentialInfo(mechanism, iterations),
                                                        password, salt)])
    fut = futmap[newuser]
    result = fut.result()
    assert result is None

    time.sleep(1)

    # Try upsertion for newuser,SCRAM_SHA_256 and add newuser,SCRAM_SHA_512
    request = [UserScramCredentialUpsertion(newuser,
                                            ScramCredentialInfo(
                                                mechanism, iterations),
                                            password, salt),
               UserScramCredentialUpsertion(newuser,
                                            ScramCredentialInfo(
                                                ScramMechanism.SCRAM_SHA_512, 10000),
                                            password)]

    if TestUtils.use_kraft():
        futmap = admin_client.alter_user_scram_credentials([request[0]])
        result = futmap[newuser].result()
        assert result is None
        futmap = admin_client.alter_user_scram_credentials([request[1]])
    else:
        futmap = admin_client.alter_user_scram_credentials(request)
    fut = futmap[newuser]
    result = fut.result()
    assert result is None

    time.sleep(1)

    # Delete newuser,SCRAM_SHA_512
    futmap = admin_client.alter_user_scram_credentials([UserScramCredentialDeletion(
        newuser,
        ScramMechanism.SCRAM_SHA_512)
    ])
    fut = futmap[newuser]
    result = fut.result()
    assert result is None

    time.sleep(1)

    # Describe all users
    for args in [[], [None]]:
        f = admin_client.describe_user_scram_credentials(*args)
        assert isinstance(f, concurrent.futures.Future)
        results = f.result()
        assert newuser in results
        description = results[newuser]
        assert isinstance(description, UserScramCredentialsDescription)
        for scram_credential_info in description.scram_credential_infos:
            assert ((scram_credential_info.mechanism == mechanism) and
                    (scram_credential_info.iterations == iterations))

    # Describe specific user
    futmap = admin_client.describe_user_scram_credentials([newuser])
    assert isinstance(futmap, dict)
    assert len(futmap) == 1
    assert newuser in futmap
    description = futmap[newuser].result()
    assert isinstance(description, UserScramCredentialsDescription)
    for scram_credential_info in description.scram_credential_infos:
        assert ((scram_credential_info.mechanism == mechanism) and
                (scram_credential_info.iterations == iterations))

    # Delete newuser
    futmap = admin_client.alter_user_scram_credentials([UserScramCredentialDeletion(newuser, mechanism)])
    assert isinstance(futmap, dict)
    assert len(futmap) == 1
    assert newuser in futmap
    fut = futmap[newuser]
    result = fut.result()
    assert result is None

    time.sleep(1)

    # newuser isn't found
    futmap = admin_client.describe_user_scram_credentials([newuser])
    assert isinstance(futmap, dict)
    assert len(futmap) == 1
    assert newuser in futmap
    fut = futmap[newuser]
    with pytest.raises(KafkaException) as ex:
        result = fut.result()
    assert ex.value.args[0] == KafkaError.RESOURCE_NOT_FOUND