File: kcsb.py

package info (click to toggle)
azure-kusto-python 5.0.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,704 kB
  • sloc: python: 10,633; sh: 13; makefile: 3
file content (650 lines) | stat: -rw-r--r-- 27,679 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
import dataclasses
from enum import unique, Enum
from typing import Union, Callable, Coroutine, Optional, Tuple, List, Any, ClassVar
from urllib.parse import urlparse

from ._string_utils import assert_string_is_not_empty
from ._token_providers import DeviceCallbackType
from .client_details import ClientDetails
from .helpers import load_bundled_json


UNSUPPORTED_KEYWORD = "UNSUPPORTED"


@unique
class SupportedKeywords(Enum):
    DATA_SOURCE = "Data Source"
    INITIAL_CATALOG = "Initial Catalog"
    FEDERATED_SECURITY = "AAD Federated Security"
    APPLICATION_CLIENT_ID = "Application Client Id"
    APPLICATION_KEY = "Application Key"
    USER_ID = "User ID"
    PASSWORD = "Password"
    AUTHORITY_ID = "Authority Id"
    APPLICATION_TOKEN = "Application Token"
    USER_TOKEN = "User Token"
    APPLICATION_CERTIFICATE_BLOB = "Application Certificate Blob"
    APPLICATION_CERTIFICATE_X5C = "Application Certificate SendX5c"
    APPLICATION_CERTIFICATE_THUMBPRINT = "Application Certificate Thumbprint"
    TRACE_APP_NAME = "Application Name for Tracing"
    TRACE_USER_NAME = "User Name for Tracing"


@unique
class UnsupportedKeywords(Enum):
    DSTS_FEDERATED_SECURITY = "dSTS Federated Security"
    STREAMING = "Streaming"
    UNCOMPRESSED = "Uncompressed"
    ENFORCE_MFA = "EnforceMfa"
    ACCEPT = "Accept"
    QUERY_CONSISTENCY = "Query Consistency"
    DATA_SOURCE_URI = "Data Source Uri"
    AZURE_REGION = "Azure Region"
    NAMESPACE = "Namespace"
    APPLICATION_CERTIFICATE_ISSUER_DISTINGUISHED_NAME = "Application Certificate Issuer Distinguished Name"
    APPLICATION_CERTIFICATE_SUBJECT_DISTINGUISHED_NAME = "Application Certificate Subject Distinguished Name"


@dataclasses.dataclass(frozen=True)
class Keyword:
    _supported_keywords: ClassVar[List[str]] = [k.value for k in SupportedKeywords]
    _unsupported_keywords: ClassVar[List[str]] = [k.value for k in UnsupportedKeywords]
    _lookup: ClassVar[dict]

    name: SupportedKeywords
    type: str
    secret: bool

    def is_str_type(self) -> bool:
        return self.type == "string"

    def is_bool_type(self) -> bool:
        return self.type == "bool"

    @staticmethod
    def normalize_string(key: str) -> str:
        return key.lower().replace(" ", "")

    @classmethod
    def init_lookup(cls):
        kcsb_json: dict = load_bundled_json("kcsb.json")
        lookup = {}
        for v in kcsb_json["keywords"]:
            name = v["name"]
            if name in cls._supported_keywords:
                keyword = Keyword(SupportedKeywords(name), v["type"], v["secret"])
            elif name in cls._unsupported_keywords:
                keyword = UNSUPPORTED_KEYWORD
            else:
                raise KeyError(f"Unknown keyword: `{name}`")

            lookup[Keyword.normalize_string(name)] = keyword

            for alias in v["aliases"]:
                lookup[Keyword.normalize_string(alias)] = keyword

        cls._lookup = lookup

    @classmethod
    def parse(cls, key: Union[str, SupportedKeywords]) -> "Keyword":
        if isinstance(key, SupportedKeywords):
            key = key.value

        normalized = Keyword.normalize_string(key)

        if normalized not in cls._lookup:
            raise KeyError(f"Unknown keyword: `{key}`")

        if cls._lookup[normalized] == UNSUPPORTED_KEYWORD:
            raise KeyError(f"Keyword `{key}` is not supported by this SDK")

        return cls._lookup[normalized]

    @classmethod
    def lookup(cls, key: Union[str, SupportedKeywords]) -> "Keyword":
        if isinstance(key, SupportedKeywords):
            key = key.value

        return cls._lookup[Keyword.normalize_string(key)]


Keyword.init_lookup()


class KustoConnectionStringBuilder:
    """
    Parses Kusto connection strings.
    For usages, check out the sample at:
        https://github.com/Azure/azure-kusto-python/blob/master/azure-kusto-data/tests/sample.py
    """

    DEFAULT_DATABASE_NAME = "NetDefaultDB"

    interactive_login: bool = False
    az_cli_login: bool = False
    device_login: bool = False
    token_credential_login: bool = False

    device_callback: DeviceCallbackType = None
    msi_authentication: bool = False
    msi_parameters: Optional[dict] = None

    token_provider: Optional[Callable[[], str]] = None
    async_token_provider: Optional[Callable[[], Coroutine[None, None, str]]] = None

    application_for_tracing: Optional[str] = None
    user_name_for_tracing: Optional[str] = None

    azure_credential: Optional[Any] = None
    azure_credential_from_login_endpoint: Optional[Any] = None

    application_public_certificate: Optional[str] = None

    def __init__(self, connection_string: str):
        """
        Creates new KustoConnectionStringBuilder.
        :param str connection_string: Kusto connection string should be of the format:
        https://<clusterName>.kusto.windows.net;AAD User ID="user@microsoft.com";Password=P@ssWord
        For more information please look at:
        https://kusto.azurewebsites.net/docs/concepts/kusto_connection_strings.html
        """
        assert_string_is_not_empty(connection_string)
        self._internal_dict = {}

        if connection_string is not None and "=" not in connection_string.partition(";")[0]:
            connection_string = "Data Source=" + connection_string

        self[SupportedKeywords.AUTHORITY_ID] = "organizations"

        for kvp_string in connection_string.split(";"):
            key, _, value = kvp_string.partition("=")
            keyword = Keyword.parse(key)

            value_stripped = value.strip()
            if keyword.is_str_type():
                if keyword.name == SupportedKeywords.DATA_SOURCE:
                    self[keyword.name] = value_stripped.rstrip("/")
                    self._parse_data_source(self.data_source)
                elif keyword.name == SupportedKeywords.TRACE_USER_NAME:
                    self.user_name_for_tracing = value_stripped
                elif keyword.name == SupportedKeywords.TRACE_APP_NAME:
                    self.application_for_tracing = value_stripped
                else:
                    self[keyword.name] = value_stripped
            elif keyword.is_bool_type():
                if value_stripped in ["True", "true"]:
                    self[keyword.name] = True
                elif value_stripped in ["False", "false"]:
                    self[keyword.name] = False
                else:
                    raise KeyError("Expected aad federated security to be bool. Recieved %s" % value)

        if self.initial_catalog is None:
            self.initial_catalog = self.DEFAULT_DATABASE_NAME

    def __setitem__(self, key: "Union[SupportedKeywords, str]", value: Union[str, bool, dict]):
        keyword = Keyword.parse(key)

        if value is None:
            raise TypeError("Value cannot be None.")

        if keyword.is_str_type():
            self._internal_dict[keyword.name] = value.strip()
        elif keyword.is_bool_type():
            if not isinstance(value, bool):
                raise TypeError("Expected %s to be bool" % key)
            self._internal_dict[keyword.name] = value
        else:
            raise KeyError("KustoConnectionStringBuilder supports only bools and strings.")

    @classmethod
    def with_aad_user_password_authentication(
        cls, connection_string: str, user_id: str, password: str, authority_id: str = "organizations"
    ) -> "KustoConnectionStringBuilder":
        """
        Creates a KustoConnection string builder that will authenticate with AAD user name and
        password.
        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
        :param str user_id: AAD user ID.
        :param str password: Corresponding password of the AAD user.
        :param str authority_id: optional param. defaults to "organizations"
        """
        assert_string_is_not_empty(user_id)
        assert_string_is_not_empty(password)

        kcsb = cls(connection_string)
        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
        kcsb[SupportedKeywords.USER_ID] = user_id
        kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
        kcsb[SupportedKeywords.PASSWORD] = password

        return kcsb

    @classmethod
    def with_aad_user_token_authentication(cls, connection_string: str, user_token: str) -> "KustoConnectionStringBuilder":
        """
        Creates a KustoConnection string builder that will authenticate with AAD application and
        a certificate credentials.
        :param str connection_string: Kusto connection string should be of the format:
        https://<clusterName>.kusto.windows.net
        :param str user_token: AAD user token.
        """
        assert_string_is_not_empty(user_token)

        kcsb = cls(connection_string)
        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
        kcsb[SupportedKeywords.USER_TOKEN] = user_token

        return kcsb

    @classmethod
    def with_aad_application_key_authentication(
        cls, connection_string: str, aad_app_id: str, app_key: str, authority_id: str
    ) -> "KustoConnectionStringBuilder":
        """
        Creates a KustoConnection string builder that will authenticate with AAD application and key.
        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
        :param str aad_app_id: AAD application ID.
        :param str app_key: Corresponding key of the AAD application.
        :param str authority_id: Authority id (aka Tenant id) must be provided
        """
        assert_string_is_not_empty(aad_app_id)
        assert_string_is_not_empty(app_key)
        assert_string_is_not_empty(authority_id)

        kcsb = cls(connection_string)
        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
        kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id
        kcsb[SupportedKeywords.APPLICATION_KEY] = app_key
        kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id

        return kcsb

    @classmethod
    def with_aad_application_certificate_authentication(
        cls, connection_string: str, aad_app_id: str, certificate: str, thumbprint: str, authority_id: str
    ) -> "KustoConnectionStringBuilder":
        """
        Creates a KustoConnection string builder that will authenticate with AAD application using
        a certificate.
        :param str connection_string: Kusto connection string should be of the format:
        https://<clusterName>.kusto.windows.net
        :param str aad_app_id: AAD application ID.
        :param str certificate: A PEM encoded certificate private key.
        :param str thumbprint: hex encoded thumbprint of the certificate.
        :param str authority_id: Authority id (aka Tenant id) must be provided
        """
        assert_string_is_not_empty(aad_app_id)
        assert_string_is_not_empty(certificate)
        assert_string_is_not_empty(thumbprint)
        assert_string_is_not_empty(authority_id)

        kcsb = cls(connection_string)
        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
        kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id
        kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_BLOB] = certificate
        kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT] = thumbprint
        kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id

        return kcsb

    @classmethod
    def with_aad_application_certificate_sni_authentication(
        cls, connection_string: str, aad_app_id: str, private_certificate: str, public_certificate: str, thumbprint: str, authority_id: str
    ) -> "KustoConnectionStringBuilder":
        """
        Creates a KustoConnection string builder that will authenticate with AAD application using
        a certificate Subject Name and Issuer.
        :param str connection_string: Kusto connection string should be of the format:
        https://<clusterName>.kusto.windows.net
        :param str aad_app_id: AAD application ID.
        :param str private_certificate: A PEM encoded certificate private key.
        :param str public_certificate: A public certificate matching the provided PEM certificate private key.
        :param str thumbprint: hex encoded thumbprint of the certificate.
        :param str authority_id: Authority id (aka Tenant id) must be provided
        """
        assert_string_is_not_empty(aad_app_id)
        assert_string_is_not_empty(private_certificate)
        assert_string_is_not_empty(public_certificate)
        assert_string_is_not_empty(thumbprint)
        assert_string_is_not_empty(authority_id)

        kcsb = cls(connection_string)
        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
        kcsb[SupportedKeywords.APPLICATION_CLIENT_ID] = aad_app_id
        kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_BLOB] = private_certificate
        kcsb.application_public_certificate = public_certificate
        kcsb[SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT] = thumbprint
        kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id

        return kcsb

    @classmethod
    def with_aad_application_token_authentication(cls, connection_string: str, application_token: str) -> "KustoConnectionStringBuilder":
        """
        Creates a KustoConnection string builder that will authenticate with AAD application and
        an application token.
        :param str connection_string: Kusto connection string should be of the format:
        https://<clusterName>.kusto.windows.net
        :param str application_token: AAD application token.
        """
        assert_string_is_not_empty(application_token)
        kcsb = cls(connection_string)
        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
        kcsb[SupportedKeywords.APPLICATION_TOKEN] = application_token

        return kcsb

    @classmethod
    def with_aad_device_authentication(
        cls, connection_string: str, authority_id: str = "organizations", callback: DeviceCallbackType = None
    ) -> "KustoConnectionStringBuilder":
        """
        Creates a KustoConnection string builder that will authenticate with AAD application and
        password.
        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
        :param str authority_id: optional param. defaults to "organizations"
        :param DeviceCallbackType callback: options callback function to be called when authentication is required, accepts three parameters:
                - ``verification_uri`` (str) the URL the user must visit
                - ``user_code`` (str) the code the user must enter there
                - ``expires_on`` (datetime.datetime) the UTC time at which the code will expire
        """
        kcsb = cls(connection_string)
        kcsb.device_login = True
        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
        kcsb[SupportedKeywords.AUTHORITY_ID] = authority_id
        kcsb.device_callback = callback

        return kcsb

    @classmethod
    def with_az_cli_authentication(cls, connection_string: str) -> "KustoConnectionStringBuilder":
        """
        Creates a KustoConnection string builder that will use existing authenticated az cli profile
        password.
        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
        """
        kcsb = cls(connection_string)
        kcsb.az_cli_login = True
        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True

        return kcsb

    @classmethod
    def with_aad_managed_service_identity_authentication(
        cls, connection_string: str, client_id: str = None, object_id: str = None, msi_res_id: str = None, timeout: int = None
    ) -> "KustoConnectionStringBuilder":
        """
        Creates a KustoConnection string builder that will authenticate with AAD application, using
        an application token obtained from a Microsoft Service Identity endpoint. An optional user
        assigned application ID can be added to the token.

        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
        :param client_id: an optional user assigned identity provided as an Azure ID of a client
        :param object_id: an optional user assigned identity provided as an Azure ID of an object
        :param msi_res_id: an optional user assigned identity provided as an Azure ID of an MSI resource
        :param timeout: an optional timeout (seconds) to wait for an MSI Authentication to occur
        """

        kcsb = cls(connection_string)
        params = {}
        exclusive_pcount = 0

        if timeout is not None:
            params["connection_timeout"] = timeout

        if client_id is not None:
            params["client_id"] = client_id
            exclusive_pcount += 1

        if object_id is not None:
            # Until we upgrade azure-identity to version 1.4.1, only client_id is excepted as a hint for user managed service identity
            raise ValueError("User Managed Service Identity with object_id is temporarily not supported by azure identity 1.3.1. Please use client_id instead.")
            # noinspection PyUnreachableCode
            params["object_id"] = object_id
            exclusive_pcount += 1

        if msi_res_id is not None:
            # Until we upgrade azure-identity to version 1.4.1, only client_id is excepted as a hint for user managed service identity
            raise ValueError(
                "User Managed Service Identity with msi_res_id is temporarily not supported by azure identity 1.3.1. Please use client_id instead."
            )
            # noinspection PyUnreachableCode
            params["msi_res_id"] = msi_res_id
            exclusive_pcount += 1

        if exclusive_pcount > 1:
            raise ValueError("the following parameters are mutually exclusive and can not be provided at the same time: client_uid, object_id, msi_res_id")

        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
        kcsb.msi_authentication = True
        kcsb.msi_parameters = params

        return kcsb

    @classmethod
    def with_token_provider(cls, connection_string: str, token_provider: Callable[[], str]) -> "KustoConnectionStringBuilder":
        """
        Create a KustoConnectionStringBuilder that uses a callback function to obtain a connection token
        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
        :param token_provider: a parameterless function that returns a valid bearer token for the relevant kusto resource as a string
        """

        assert callable(token_provider)

        kcsb = cls(connection_string)
        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
        kcsb.token_provider = token_provider

        return kcsb

    @classmethod
    def with_async_token_provider(
        cls,
        connection_string: str,
        async_token_provider: Callable[[], Coroutine[None, None, str]],
    ) -> "KustoConnectionStringBuilder":
        """
        Create a KustoConnectionStringBuilder that uses an async callback function to obtain a connection token
        :param str connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
        :param async_token_provider: a parameterless function that after awaiting returns a valid bearer token for the relevant kusto resource as a string
        """

        assert callable(async_token_provider)

        kcsb = cls(connection_string)
        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
        kcsb.async_token_provider = async_token_provider

        return kcsb

    @classmethod
    def with_interactive_login(
        cls, connection_string: str, user_id_hint: Optional[str] = None, tenant_hint: Optional[str] = None
    ) -> "KustoConnectionStringBuilder":
        kcsb = cls(connection_string)
        kcsb.interactive_login = True
        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
        if user_id_hint is not None:
            kcsb[SupportedKeywords.USER_ID] = user_id_hint

        if tenant_hint is not None:
            kcsb[SupportedKeywords.AUTHORITY_ID] = tenant_hint

        return kcsb

    @classmethod
    def with_azure_token_credential(
        cls,
        connection_string: str,
        credential: Optional[Any] = None,
        credential_from_login_endpoint: Optional[Callable[[str], Any]] = None,
    ) -> "KustoConnectionStringBuilder":
        """
        Create a KustoConnectionStringBuilder that uses an azure token credential to obtain a connection token.
        :param connection_string: Kusto connection string should be of the format: https://<clusterName>.kusto.windows.net
        :param credential: an optional token credential to use for authentication
        :param credential_from_login_endpoint: an optional function that returns a token credential for the relevant kusto resource
        """
        kcsb = cls(connection_string)
        kcsb[SupportedKeywords.FEDERATED_SECURITY] = True
        kcsb.token_credential_login = True
        kcsb.azure_credential = credential
        kcsb.azure_credential_from_login_endpoint = credential_from_login_endpoint

        return kcsb

    @classmethod
    def with_no_authentication(cls, connection_string: str) -> "KustoConnectionStringBuilder":
        """
        Create a KustoConnectionStringBuilder that uses no authentication.
        :param connection_string: Kusto's connection string should be of the format: http://<clusterName>.kusto.windows.net
        """
        if not connection_string.startswith("http://"):
            raise ValueError("Connection string must start with http://")
        kcsb = cls(connection_string)
        kcsb[SupportedKeywords.FEDERATED_SECURITY] = False

        return kcsb

    @property
    def data_source(self) -> Optional[str]:
        """The URI specifying the Kusto service endpoint.
        For example, https://kuskus.kusto.windows.net or net.tcp://localhost
        """
        return self._internal_dict.get(SupportedKeywords.DATA_SOURCE)

    @property
    def initial_catalog(self) -> Optional[str]:
        """The default database to be used for requests.
        By default, it is set to 'NetDefaultDB'.
        """
        return self._internal_dict.get(SupportedKeywords.INITIAL_CATALOG)

    @initial_catalog.setter
    def initial_catalog(self, value: str) -> None:
        self._internal_dict[SupportedKeywords.INITIAL_CATALOG] = value

    @property
    def aad_user_id(self) -> Optional[str]:
        """The username to use for AAD Federated AuthN."""
        return self._internal_dict.get(SupportedKeywords.USER_ID)

    @property
    def application_client_id(self) -> Optional[str]:
        """The application client id to use for authentication when federated
        authentication is used.
        """
        return self._internal_dict.get(SupportedKeywords.APPLICATION_CLIENT_ID)

    @property
    def application_key(self) -> Optional[str]:
        """The application key to use for authentication when federated authentication is used"""
        return self._internal_dict.get(SupportedKeywords.APPLICATION_KEY)

    @property
    def application_certificate(self) -> Optional[str]:
        """A PEM encoded certificate private key."""
        return self._internal_dict.get(SupportedKeywords.APPLICATION_CERTIFICATE_BLOB)

    @application_certificate.setter
    def application_certificate(self, value: str):
        self[SupportedKeywords.APPLICATION_CERTIFICATE_BLOB] = value

    @property
    def application_certificate_thumbprint(self) -> Optional[str]:
        """hex encoded thumbprint of the certificate."""
        return self._internal_dict.get(SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT)

    @application_certificate_thumbprint.setter
    def application_certificate_thumbprint(self, value: str):
        self[SupportedKeywords.APPLICATION_CERTIFICATE_THUMBPRINT] = value

    @property
    def authority_id(self) -> Optional[str]:
        """The ID of the AAD tenant where the application is configured.
        (should be supplied only for non-Microsoft tenant)"""
        return self._internal_dict.get(SupportedKeywords.AUTHORITY_ID)

    @authority_id.setter
    def authority_id(self, value: str):
        self[SupportedKeywords.AUTHORITY_ID] = value

    @property
    def aad_federated_security(self) -> Optional[bool]:
        """A Boolean value that instructs the client to perform AAD federated authentication."""
        return self._internal_dict.get(SupportedKeywords.FEDERATED_SECURITY)

    @property
    def user_token(self) -> Optional[str]:
        """User token."""
        return self._internal_dict.get(SupportedKeywords.USER_TOKEN)

    @property
    def application_token(self) -> Optional[str]:
        """Application token."""
        return self._internal_dict.get(SupportedKeywords.APPLICATION_TOKEN)

    @property
    def client_details(self) -> ClientDetails:
        return ClientDetails(self.application_for_tracing, self.user_name_for_tracing)

    @property
    def login_hint(self) -> Optional[str]:
        return self._internal_dict.get(SupportedKeywords.USER_ID)

    @property
    def domain_hint(self) -> Optional[str]:
        return self._internal_dict.get(SupportedKeywords.AUTHORITY_ID)

    @property
    def password(self) -> Optional[str]:
        return self._internal_dict.get(SupportedKeywords.PASSWORD)

    def _set_connector_details(
        self,
        name: str,
        version: str,
        app_name: Optional[str] = None,
        app_version: Optional[str] = None,
        send_user: bool = False,
        override_user: Optional[str] = None,
        additional_fields: Optional[List[Tuple[str, str]]] = None,
    ):
        """
        Sets the connector details for tracing purposes.
        :param name:  The name of the connector
        :param version:  The version of the connector
        :param send_user: Whether to send the user name
        :param override_user: Override the user name ( if send_user is True )
        :param app_name: The name of the containing application
        :param app_version: The version of the containing application
        :param additional_fields: Additional fields to add to the header
        """
        client_details = ClientDetails.set_connector_details(name, version, app_name, app_version, send_user, override_user, additional_fields)

        self.application_for_tracing = client_details.application_for_tracing
        self.user_name_for_tracing = client_details.user_name_for_tracing

    def __str__(self) -> str:
        dict_copy = self._internal_dict.copy()
        for key in dict_copy:
            if Keyword.lookup(key).secret:
                dict_copy[key] = "****"
        return self._build_connection_string(dict_copy)

    def __repr__(self) -> str:
        return self._build_connection_string(self._internal_dict)

    def _build_connection_string(self, kcsb_as_dict: dict) -> str:
        return ";".join(["{0}={1}".format(word.value, kcsb_as_dict[word]) for word in SupportedKeywords if word in kcsb_as_dict])

    def _parse_data_source(self, url: str):
        url = urlparse(url)
        if not url.netloc:
            return
        segments = url.path.lstrip("/").split("/")
        if len(segments) == 1 and segments[0] and not self.initial_catalog:
            self.initial_catalog = segments[0]
            self._internal_dict[SupportedKeywords.DATA_SOURCE] = url._replace(path="").geturl()