File: client_base.py

package info (click to toggle)
azure-kusto-python 5.0.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,704 kB
  • sloc: python: 10,633; sh: 13; makefile: 3
file content (260 lines) | stat: -rw-r--r-- 10,340 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import abc
import io
import json
import uuid
from datetime import timedelta
from typing import Union, Optional, Any, NoReturn, ClassVar, TYPE_CHECKING
from urllib.parse import urljoin

from requests import Response, Session

from azure.kusto.data._cloud_settings import CloudSettings
from azure.kusto.data._token_providers import CloudInfoTokenProvider
from .client_details import ClientDetails
from .client_request_properties import ClientRequestProperties
from .exceptions import KustoServiceError, KustoThrottlingError, KustoApiError
from .kcsb import KustoConnectionStringBuilder
from .kusto_trusted_endpoints import well_known_kusto_endpoints
from .response import KustoResponseDataSet, KustoResponseDataSetV2, KustoResponseDataSetV1
from .security import _AadHelper

if TYPE_CHECKING:
    import aiohttp


class _KustoClientBase(abc.ABC):
    API_VERSION = "2024-12-12"

    _mgmt_default_timeout: ClassVar[timedelta] = timedelta(hours=1, seconds=30)
    _query_default_timeout: ClassVar[timedelta] = timedelta(minutes=4, seconds=30)
    _streaming_ingest_default_timeout: ClassVar[timedelta] = timedelta(minutes=10)
    _client_server_delta: ClassVar[timedelta] = timedelta(seconds=30)

    _aad_helper: _AadHelper
    client_details: ClientDetails
    _endpoint_validated = False
    _session: Union["aiohttp.ClientSession", "Session"]

    def __init__(self, kcsb: Union[KustoConnectionStringBuilder, str], is_async):
        self._kcsb = kcsb
        self._proxy_url: Optional[str] = None
        if not isinstance(kcsb, KustoConnectionStringBuilder):
            self._kcsb = KustoConnectionStringBuilder(kcsb)
        self._kusto_cluster = self._kcsb.data_source

        # notice that in this context, federated actually just stands for aad auth, not aad federated auth (legacy code)
        self._aad_helper = _AadHelper(self._kcsb, is_async) if self._kcsb.aad_federated_security else None

        if not self._kusto_cluster.endswith("/"):
            self._kusto_cluster += "/"

        # Create a session object for connection pooling
        self._mgmt_endpoint = urljoin(self._kusto_cluster, "v1/rest/mgmt")
        self._query_endpoint = urljoin(self._kusto_cluster, "v2/rest/query")
        self._streaming_ingest_endpoint = urljoin(self._kusto_cluster, "v1/rest/ingest/")
        self._request_headers = {
            "Accept": "application/json",
            "Accept-Encoding": "gzip,deflate",
            "x-ms-version": self.API_VERSION,
        }

        self.client_details = self._kcsb.client_details
        self._is_closed: bool = False

        self.default_database = self._kcsb.initial_catalog

    def _get_database_or_default(self, database_name: Optional[str]) -> str:
        return database_name or self.default_database

    def close(self):
        self._is_closed = True

    def set_proxy(self, proxy_url: str):
        self._proxy_url = proxy_url
        if self._aad_helper:
            self._aad_helper.token_provider.set_proxy(proxy_url)
            if isinstance(self._session, Session):
                self._aad_helper.token_provider.set_session(self._session)

    def validate_endpoint(self):
        if not self._endpoint_validated and self._aad_helper is not None:
            if isinstance(self._aad_helper.token_provider, CloudInfoTokenProvider):
                endpoint = CloudSettings.get_cloud_info_for_cluster(
                    self._kusto_cluster,
                    self._aad_helper.token_provider._proxy_dict,
                    self._session if isinstance(self._session, Session) else None,
                ).login_endpoint
                well_known_kusto_endpoints.validate_trusted_endpoint(
                    self._kusto_cluster,
                    endpoint,
                )
            self._endpoint_validated = True

    @staticmethod
    def _kusto_parse_by_endpoint(endpoint: str, response_json: Any) -> KustoResponseDataSet:
        if endpoint.endswith("v2/rest/query"):
            return KustoResponseDataSetV2(response_json)
        return KustoResponseDataSetV1(response_json)

    @staticmethod
    def _handle_http_error(
        exception: Exception,
        endpoint: Optional[str],
        payload: Optional[io.IOBase],
        response: "Union[Response, aiohttp.ClientResponse]",
        status: int,
        response_json: Any,
        response_text: Optional[str],
    ) -> NoReturn:
        if status == 404:
            if payload:
                raise KustoServiceError("The ingestion endpoint does not exist. Please enable streaming ingestion on your cluster.", response) from exception

            raise KustoServiceError(f"The requested endpoint '{endpoint}' does not exist.", response) from exception

        if status == 429:
            raise KustoThrottlingError("The request was throttled by the server.", response) from exception

        if status == 401:
            raise KustoServiceError(f"401. Missing adequate access rights.", response) from exception

        if payload:
            message = f"An error occurred while trying to ingest: Status: {status}, Reason: {response.reason}, Text: {response_text}."
            if response_json:
                raise KustoApiError(response_json, message, response) from exception

            raise KustoServiceError(message, response) from exception

        if response_json:
            raise KustoApiError(response_json, http_response=response) from exception

        if response_text:
            raise KustoServiceError(response_text, response) from exception

        raise KustoServiceError("Server error response contains no data.", response) from exception


class ExecuteRequestParams:
    @staticmethod
    def _from_stream(
        stream: io.IOBase,
        properties: ClientRequestProperties,
        request_headers: Any,
        timeout: timedelta,
        mgmt_default_timeout: timedelta,
        client_server_delta: timedelta,
        client_details: ClientDetails,
    ):
        # Before 3.0 it was KPC.execute_streaming_ingest, but was changed to align with the other SDKs
        client_request_id_prefix = "KPC.executeStreamingIngest;"
        request_headers = request_headers.copy()
        request_headers["Content-Encoding"] = "gzip"
        if properties:
            request_headers.update(json.loads(properties.to_json())["Options"])

        return ExecuteRequestParams(
            stream, None, request_headers, client_request_id_prefix, properties, timeout, mgmt_default_timeout, client_server_delta, client_details
        )

    @staticmethod
    def _from_query(
        query: str,
        database: str,
        properties: ClientRequestProperties,
        request_headers: Any,
        timeout: timedelta,
        mgmt_default_timeout: timedelta,
        client_server_delta: timedelta,
        client_details: ClientDetails,
    ):
        json_payload = {"db": database, "csl": query}
        if properties:
            json_payload["properties"] = properties.to_json()

        client_request_id_prefix = "KPC.execute;"
        request_headers = request_headers.copy()
        request_headers["Content-Type"] = "application/json; charset=utf-8"

        return ExecuteRequestParams(
            None, json_payload, request_headers, client_request_id_prefix, properties, timeout, mgmt_default_timeout, client_server_delta, client_details
        )

    @staticmethod
    def _from_blob_url(
        blob: str,
        properties: ClientRequestProperties,
        request_headers: Any,
        timeout: timedelta,
        mgmt_default_timeout: timedelta,
        client_server_delta: timedelta,
        client_details: ClientDetails,
    ):
        json_payload = {"sourceUri": blob}
        client_request_id_prefix = "KPC.executeStreamingIngestFromBlob;"
        request_headers = request_headers.copy()
        request_headers["Content-Type"] = "application/json; charset=utf-8"
        if properties:
            request_headers.update(json.loads(properties.to_json())["Options"])
        return ExecuteRequestParams(
            None, json_payload, request_headers, client_request_id_prefix, properties, timeout, mgmt_default_timeout, client_server_delta, client_details
        )

    def __init__(
        self,
        payload,
        json_payload,
        request_headers,
        client_request_id_prefix,
        properties: ClientRequestProperties,
        timeout: timedelta,
        mgmt_default_timeout: timedelta,
        client_server_delta: timedelta,
        client_details: ClientDetails,
    ):
        special_headers = [
            {
                "name": "x-ms-client-request-id",
                "value": client_request_id_prefix + str(uuid.uuid4()),
                "property": lambda p: p.client_request_id,
            },
            {
                "name": "x-ms-client-version",
                "value": client_details.version_for_tracing,
                "property": lambda p: None,
            },
            {
                "name": "x-ms-app",
                "value": client_details.application_for_tracing,
                "property": lambda p: p.application,
            },
            {
                "name": "x-ms-user",
                "value": client_details.user_name_for_tracing,
                "property": lambda p: p.user,
            },
        ]

        for header in special_headers:
            value: str
            if properties and header["property"](properties) is not None:
                value = header["property"](properties)
            else:
                value = header["value"]

            if value is not None:
                # Replace any characters that aren't ascii with '?'
                value = value.encode("ascii", "replace").decode("ascii", "strict")
                request_headers[header["name"]] = value

        if properties is not None:
            if properties.get_option(ClientRequestProperties.no_request_timeout_option_name, False):
                timeout = mgmt_default_timeout
            else:
                timeout = properties.get_option(ClientRequestProperties.request_timeout_option_name, timeout)

        timeout = (timeout or mgmt_default_timeout) + client_server_delta

        self.json_payload = json_payload
        self.request_headers = request_headers
        self.timeout = timeout
        self.payload = payload