File: test_kusto_connection_string_builder.py

package info (click to toggle)
azure-kusto-python 4.1.1-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 1,184 kB
  • sloc: python: 9,329; sh: 13; makefile: 3
file content (302 lines) | stat: -rw-r--r-- 14,697 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License
import unittest
from uuid import uuid4

from azure.kusto.data import KustoConnectionStringBuilder


class KustoConnectionStringBuilderTests(unittest.TestCase):
    """Tests class for KustoConnectionStringBuilder."""

    PASSWORDS_REPLACEMENT = "****"

    def test_no_credentials(self):
        """Checks kcsb that is created with no credentials"""
        kcsbs = [
            KustoConnectionStringBuilder("localhost"),
            KustoConnectionStringBuilder("data Source=localhost"),
            KustoConnectionStringBuilder("Addr=localhost"),
            KustoConnectionStringBuilder("Addr = localhost"),
        ]

        for kcsb in kcsbs:
            assert kcsb.data_source == "localhost"
            assert not kcsb.aad_federated_security
            assert kcsb.aad_user_id is None
            assert kcsb.password is None
            assert kcsb.application_client_id is None
            assert kcsb.application_key is None
            assert kcsb.authority_id == "organizations"
            assert repr(kcsb) == "Data Source=localhost;Authority Id=organizations"
            assert str(kcsb) == "Data Source=localhost;Authority Id=organizations"

    def test_aad_app(self):
        """Checks kcsb that is created with AAD application credentials."""
        uuid = str(uuid4())
        key = "key of application"
        kcsbs = [
            KustoConnectionStringBuilder(
                "localhost;Application client Id={0};application Key={1};Authority Id={2} ; aad federated security = {3}".format(
                    uuid, key, "microsoft.com", True
                )
            ),
            KustoConnectionStringBuilder(
                "Data Source=localhost ; Application Client Id={0}; Appkey ={1};Authority Id= {2} ; aad federated security = {3}".format(
                    uuid, key, "microsoft.com", True
                )
            ),
            KustoConnectionStringBuilder(
                " Addr = localhost ; AppClientId = {0} ; AppKey ={1}; Authority Id={2} ; aad federated security = {3}".format(uuid, key, "microsoft.com", True)
            ),
            KustoConnectionStringBuilder(
                "Network Address = localhost; AppClientId = {0} ; AppKey ={1};AuthorityId={2} ; aad federated security = {3}".format(
                    uuid, key, "microsoft.com", True
                )
            ),
            KustoConnectionStringBuilder.with_aad_application_key_authentication("localhost", uuid, key, "microsoft.com"),
        ]

        try:
            KustoConnectionStringBuilder.with_aad_application_key_authentication("localhost", uuid, key, None)
        except Exception as e:
            # make sure error is raised when authority_id i none
            assert isinstance(e, ValueError)

        kcsb1 = KustoConnectionStringBuilder("server=localhost")
        kcsb1[KustoConnectionStringBuilder.ValidKeywords.application_client_id] = uuid
        kcsb1[KustoConnectionStringBuilder.ValidKeywords.application_key] = key
        kcsb1[KustoConnectionStringBuilder.ValidKeywords.authority_id] = "microsoft.com"
        kcsb1[KustoConnectionStringBuilder.ValidKeywords.aad_federated_security] = True
        kcsbs.append(kcsb1)

        kcsb2 = KustoConnectionStringBuilder("Server=localhost")
        kcsb2["AppclientId"] = uuid
        kcsb2["Application key"] = key
        kcsb2["Authority Id"] = "microsoft.com"
        kcsb2["aad federated security"] = True
        kcsbs.append(kcsb2)

        for kcsb in kcsbs:
            assert kcsb.data_source == "localhost"
            assert kcsb.aad_federated_security
            assert kcsb.aad_user_id is None
            assert kcsb.password is None
            assert kcsb.application_client_id == uuid
            assert kcsb.application_key == key
            assert kcsb.authority_id == "microsoft.com"
            assert repr(kcsb) == "Data Source=localhost;AAD Federated Security=True;Application Client Id={0};Application Key={1};Authority Id={2}".format(
                uuid, key, "microsoft.com"
            )

            assert str(kcsb) == "Data Source=localhost;AAD Federated Security=True;Application Client Id={0};Application Key={1};Authority Id={2}".format(
                uuid, self.PASSWORDS_REPLACEMENT, "microsoft.com"
            )

    def test_aad_user(self):
        """Checks kcsb that is created with AAD user credentials."""
        user = "test"
        password = "Pa$$w0rd"
        kcsbs = [
            KustoConnectionStringBuilder("localhost;AAD User ID={0};password={1} ;AAD Federated Security=True ".format(user, password)),
            KustoConnectionStringBuilder("Data Source=localhost ; AaD User ID={0}; Password ={1} ;AAD Federated Security=True".format(user, password)),
            KustoConnectionStringBuilder(" Addr = localhost ; AAD User ID = {0} ; Pwd ={1} ;AAD Federated Security=True".format(user, password)),
            KustoConnectionStringBuilder("Network Address = localhost; AAD User iD = {0} ; Pwd = {1} ;AAD Federated Security= True  ".format(user, password)),
            KustoConnectionStringBuilder.with_aad_user_password_authentication("localhost", user, password),
        ]

        kcsb1 = KustoConnectionStringBuilder("Server=localhost")
        kcsb1[KustoConnectionStringBuilder.ValidKeywords.aad_user_id] = user
        kcsb1[KustoConnectionStringBuilder.ValidKeywords.password] = password
        kcsb1[KustoConnectionStringBuilder.ValidKeywords.aad_federated_security] = True
        kcsbs.append(kcsb1)

        kcsb2 = KustoConnectionStringBuilder("server=localhost")
        kcsb2["AAD User ID"] = user
        kcsb2["Password"] = password
        kcsb2["aad federated security"] = True
        kcsbs.append(kcsb2)

        for kcsb in kcsbs:
            assert kcsb.data_source == "localhost"
            assert kcsb.aad_federated_security
            assert kcsb.aad_user_id == user
            assert kcsb.password == password
            assert kcsb.application_client_id is None
            assert kcsb.application_key is None
            assert kcsb.authority_id == "organizations"
            assert repr(kcsb) == "Data Source=localhost;AAD Federated Security=True;AAD User ID={0};Password={1};Authority Id=organizations".format(
                user, password
            )
            assert str(kcsb) == "Data Source=localhost;AAD Federated Security=True;AAD User ID={0};Password={1};Authority Id=organizations".format(
                user, self.PASSWORDS_REPLACEMENT
            )

    def test_aad_user_with_authority(self):
        """Checks kcsb that is created with AAD user credentials."""
        user = "test2"
        password = "Pa$$w0rd2"
        authority_id = "13456"

        kcsb = KustoConnectionStringBuilder.with_aad_user_password_authentication("localhost", user, password, authority_id)

        assert kcsb.data_source == "localhost"
        assert kcsb.aad_federated_security
        assert kcsb.aad_user_id == user
        assert kcsb.password == password
        assert kcsb.application_client_id is None
        assert kcsb.application_key is None
        assert kcsb.authority_id == authority_id
        assert repr(kcsb) == "Data Source=localhost;AAD Federated Security=True;AAD User ID={0};Password={1};Authority Id=13456".format(user, password)
        assert str(kcsb) == "Data Source=localhost;AAD Federated Security=True;AAD User ID={0};Password={1};Authority Id=13456".format(
            user, self.PASSWORDS_REPLACEMENT
        )

    def test_aad_device_login(self):
        """Checks kcsb that is created with AAD device login."""
        kcsb = KustoConnectionStringBuilder.with_aad_device_authentication("localhost")
        assert kcsb.data_source == "localhost"
        assert kcsb.aad_federated_security
        assert kcsb.aad_user_id is None
        assert kcsb.password is None
        assert kcsb.application_client_id is None
        assert kcsb.application_key is None
        assert kcsb.authority_id == "organizations"
        assert repr(kcsb) == "Data Source=localhost;AAD Federated Security=True;Authority Id=organizations"
        assert str(kcsb) == "Data Source=localhost;AAD Federated Security=True;Authority Id=organizations"

    def test_aad_app_token(self):
        """Checks kcsb that is created with AAD user token."""
        token = "The app hardest token ever"
        kcsb = KustoConnectionStringBuilder.with_aad_application_token_authentication("localhost", application_token=token)
        assert kcsb.data_source == "localhost"
        assert kcsb.application_token == token
        assert kcsb.aad_federated_security
        assert kcsb.aad_user_id is None
        assert kcsb.password is None
        assert kcsb.application_client_id is None
        assert kcsb.application_key is None
        assert kcsb.user_token is None
        assert kcsb.authority_id == "organizations"
        assert repr(kcsb) == "Data Source=localhost;AAD Federated Security=True;Authority Id=organizations;Application Token=%s" % token
        assert str(kcsb) == "Data Source=localhost;AAD Federated Security=True;Authority Id=organizations;Application Token=%s" % self.PASSWORDS_REPLACEMENT

    def test_aad_user_token(self):
        """Checks kcsb that is created with AAD user token."""
        token = "The user hardest token ever"
        kcsb = KustoConnectionStringBuilder.with_aad_user_token_authentication("localhost", user_token=token)
        assert kcsb.data_source == "localhost"
        assert kcsb.user_token == token
        assert kcsb.aad_federated_security
        assert kcsb.aad_user_id is None
        assert kcsb.password is None
        assert kcsb.application_client_id is None
        assert kcsb.application_key is None
        assert kcsb.application_token is None
        assert kcsb.authority_id == "organizations"
        assert repr(kcsb) == "Data Source=localhost;AAD Federated Security=True;Authority Id=organizations;User Token=%s" % token
        assert str(kcsb) == "Data Source=localhost;AAD Federated Security=True;Authority Id=organizations;User Token=%s" % self.PASSWORDS_REPLACEMENT

    def test_add_msi(self):
        client_guid = "kjhjk"
        object_guid = "87687687"
        res_guid = "kajsdghdijewhag"

        """
        Use of object_id and msi_res_id is disabled pending support of azure-identity
        When version 1.4.1 is released and these parameters are supported enable the functionality and tests back 
        """
        exception_occurred = False
        try:
            KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication("localhost2", object_id=object_guid, timeout=3)
        except ValueError:
            exception_occurred = True

        assert exception_occurred is True

        exception_occurred = False
        try:
            KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication("localhost3", msi_res_id=res_guid)
        except ValueError:
            exception_occurred = True

        assert exception_occurred is True

        kcsbs = [
            KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication("localhost0", timeout=1),
            KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication("localhost1", client_id=client_guid, timeout=2),
            # KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication("localhost2", object_id=object_guid, timeout=3),
            # KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication("localhost3", msi_res_id=res_guid),
        ]

        assert kcsbs[0].msi_authentication
        assert kcsbs[0].msi_parameters["connection_timeout"] == 1
        assert "client_id" not in kcsbs[0].msi_parameters
        assert "object_id" not in kcsbs[0].msi_parameters
        assert "msi_res_id" not in kcsbs[0].msi_parameters

        assert kcsbs[1].msi_authentication
        assert kcsbs[1].msi_parameters["connection_timeout"] == 2
        assert kcsbs[1].msi_parameters["client_id"] == client_guid
        assert "object_id" not in kcsbs[1].msi_parameters
        assert "msi_res_id" not in kcsbs[1].msi_parameters

        """
        assert kcsb[2].msi_authentication
        assert kcsb[2].msi_parameters["connection_timeout"] == 3
        assert "client_id" not in kcsb[2].msi_parameters
        assert kcsb[2].msi_parameters["object_id"] == object_guid
        assert "msi_res_id" not in kcsb[2].msi_parameters

        assert kcsb[3].msi_authentication
        assert "timeout" not in kcsb[3].msi_parameters
        assert "client_id" not in kcsb[3].msi_parameters
        assert "object_id" not in kcsb[3].msi_parameters
        assert kcsb[3].msi_parameters["msi_res_id"] == res_guid
        """

        exception_occurred = False
        try:
            fault = KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication("localhost", client_id=client_guid, object_id=object_guid)
        except ValueError as e:
            exception_occurred = True
        finally:
            assert exception_occurred

        # Check serializability.
        for kcsb in kcsbs:
            _ = KustoConnectionStringBuilder(str(kcsb))

    def test_add_token_provider(self):
        caller_token = "caller token"
        token_provider = lambda: caller_token

        kscb = KustoConnectionStringBuilder.with_token_provider("localhost", token_provider)

        assert kscb.token_provider() == caller_token

        exception_occurred = False
        try:
            kscb = KustoConnectionStringBuilder.with_token_provider("localhost", caller_token)
        except AssertionError as ex:
            exception_occurred = True
        finally:
            assert exception_occurred

    def test_add_async_token_provider(self):
        caller_token = "caller token"

        async def async_token_provider():
            return caller_token

        kscb = KustoConnectionStringBuilder.with_async_token_provider("localhost", async_token_provider)

        assert kscb.async_token_provider() is not None

        exception_occurred = False
        try:
            kscb = KustoConnectionStringBuilder.with_async_token_provider("localhost", caller_token)
        except AssertionError as ex:
            exception_occurred = True
        finally:
            assert exception_occurred