File: ingest_client.py

package info (click to toggle)
azure-kusto-python 5.0.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,704 kB
  • sloc: python: 10,633; sh: 13; makefile: 3
file content (205 lines) | stat: -rw-r--r-- 10,667 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License
import random
from typing import Union, AnyStr, IO, List, Optional, Dict
from urllib.parse import urlparse

from azure.storage.blob import BlobServiceClient

from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing import SpanKind
from azure.storage.queue import QueueServiceClient, TextBase64EncodePolicy

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data._telemetry import MonitoredActivity
from azure.kusto.data.exceptions import KustoClosedError, KustoServiceError

from ._ingest_telemetry import IngestTracingAttributes
from ._resource_manager import _ResourceManager, _ResourceUri
from .base_ingest_client import BaseIngestClient, IngestionResult, IngestionStatus
from .descriptors import BlobDescriptor, FileDescriptor, StreamDescriptor
from .exceptions import KustoInvalidEndpointError, KustoQueueError
from azure.kusto.data.exceptions import KustoBlobError
from .ingestion_blob_info import IngestionBlobInfo
from .ingestion_properties import IngestionProperties


class QueuedIngestClient(BaseIngestClient):
    """
    Queued ingest client provides methods to allow queued ingestion into kusto (ADX).
    To learn more about the different types of ingestions and when to use each, visit:
    https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
    """

    _INGEST_PREFIX = "ingest-"
    _SERVICE_CLIENT_TIMEOUT_SECONDS = 10 * 60
    _MAX_RETRIES = 3

    def __init__(self, kcsb: Union[str, KustoConnectionStringBuilder], auto_correct_endpoint: bool = True):
        """Kusto Ingest Client constructor.
        :param kcsb: The connection string to initialize KustoClient.
        """
        super().__init__()
        if not isinstance(kcsb, KustoConnectionStringBuilder):
            kcsb = KustoConnectionStringBuilder(kcsb)

        if auto_correct_endpoint:
            kcsb["Data Source"] = BaseIngestClient.get_ingestion_endpoint(kcsb.data_source)

        self._proxy_dict: Optional[Dict[str, str]] = None
        self._connection_datasource = kcsb.data_source
        self._resource_manager = _ResourceManager(KustoClient(kcsb))
        self._endpoint_service_type = None
        self._suggested_endpoint_uri = None
        self.application_for_tracing = kcsb.client_details.application_for_tracing
        self.client_version_for_tracing = kcsb.client_details.version_for_tracing

    def close(self) -> None:
        self._resource_manager.close()
        super().close()

    def set_proxy(self, proxy_url: str):
        self._resource_manager.set_proxy(proxy_url)
        self._proxy_dict = {"http": proxy_url, "https": proxy_url}

    @distributed_trace(name_of_span="QueuedIngestClient.ingest_from_file", kind=SpanKind.CLIENT)
    def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
        """Enqueue an ingest command from local files.
        To learn more about ingestion methods go to:
        https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
        :param file_descriptor: a FileDescriptor to be ingested.
        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
        """
        file_descriptor = FileDescriptor.get_instance(file_descriptor)
        IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties)

        super().ingest_from_file(file_descriptor, ingestion_properties)

        containers = self._get_containers()

        file_descriptor, should_compress = BaseIngestClient._prepare_file(file_descriptor, ingestion_properties)
        with file_descriptor.open(should_compress) as stream:
            blob_descriptor = self.upload_blob(
                containers,
                file_descriptor,
                ingestion_properties.database,
                ingestion_properties.table,
                stream,
                self._proxy_dict,
                self._SERVICE_CLIENT_TIMEOUT_SECONDS,
                self._MAX_RETRIES,
            )
        return self.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_properties)

    @distributed_trace(name_of_span="QueuedIngestClient.ingest_from_stream", kind=SpanKind.CLIENT)
    def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
        """Ingest from io streams.
        :param stream_descriptor: An object that contains a description of the stream to be ingested.
        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
        """
        stream_descriptor = StreamDescriptor.get_instance(stream_descriptor)
        IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties)

        super().ingest_from_stream(stream_descriptor, ingestion_properties)

        containers = self._get_containers()

        stream_descriptor = BaseIngestClient._prepare_stream(stream_descriptor, ingestion_properties)
        blob_descriptor = self.upload_blob(
            containers,
            stream_descriptor,
            ingestion_properties.database,
            ingestion_properties.table,
            stream_descriptor.stream,
            self._proxy_dict,
            self._SERVICE_CLIENT_TIMEOUT_SECONDS,
            self._MAX_RETRIES,
        )
        return self.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_properties)

    @distributed_trace(name_of_span="QueuedIngestClient.ingest_from_blob", kind=SpanKind.CLIENT)
    def ingest_from_blob(self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties) -> IngestionResult:
        """Enqueue an ingest command from azure blobs.
        To learn more about ingestion methods go to:
        https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
        :param azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested.
        :param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
        """
        IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties)

        if self._is_closed:
            raise KustoClosedError()

        queues = self._resource_manager.get_ingestion_queues()

        authorization_context = self._resource_manager.get_authorization_context()
        ingestion_blob_info = IngestionBlobInfo(
            blob_descriptor,
            ingestion_properties=ingestion_properties,
            auth_context=authorization_context,
            application_for_tracing=self.application_for_tracing,
            client_version_for_tracing=self.client_version_for_tracing,
        )
        ingestion_blob_info_json = ingestion_blob_info.to_json()
        retries_left = min(self._MAX_RETRIES, len(queues))
        for queue in queues:
            try:
                with QueueServiceClient(queue.account_uri, proxies=self._proxy_dict) as queue_service:
                    with queue_service.get_queue_client(queue=queue.object_name, message_encode_policy=TextBase64EncodePolicy()) as queue_client:
                        # trace enqueuing of blob for ingestion
                        invoker = lambda: queue_client.send_message(content=ingestion_blob_info_json, timeout=self._SERVICE_CLIENT_TIMEOUT_SECONDS)
                        enqueue_trace_attributes = IngestTracingAttributes.create_enqueue_request_attributes(queue_client.queue_name, blob_descriptor.source_id)
                        MonitoredActivity.invoke(invoker, name_of_span="QueuedIngestClient.enqueue_request", tracing_attributes=enqueue_trace_attributes)

                self._resource_manager.report_resource_usage_result(queue.storage_account_name, True)
                return IngestionResult(
                    IngestionStatus.QUEUED, ingestion_properties.database, ingestion_properties.table, blob_descriptor.source_id, blob_descriptor.path
                )
            except Exception as e:
                retries_left = retries_left - 1
                # TODO: log the retry once we have a proper logging system
                self._resource_manager.report_resource_usage_result(queue.storage_account_name, False)
                if retries_left == 0:
                    raise KustoQueueError() from e

    def _get_containers(self) -> List[_ResourceUri]:
        return self._resource_manager.get_containers()

    def upload_blob(
        self,
        containers: List[_ResourceUri],
        descriptor: Union[FileDescriptor, "StreamDescriptor"],
        database: str,
        table: str,
        stream: IO[AnyStr],
        proxy_dict: Optional[Dict[str, str]],
        timeout: int,
        max_retries: int,
    ) -> "BlobDescriptor":
        """
        Uploads and transforms FileDescriptor or StreamDescriptor into a BlobDescriptor instance
        :param List[_ResourceUri] containers: blob containers
        :param Union[FileDescriptor, "StreamDescriptor"] descriptor:
        :param string database: database to be ingested to
        :param string table: table to be ingested to
        :param IO[AnyStr] stream: stream to be ingested from
        :param Optional[Dict[str, str]] proxy_dict: proxy urls
        :param int timeout: Azure service call timeout in seconds
        :return new BlobDescriptor instance
        """
        blob_name = "{db}__{table}__{guid}__{file}".format(db=database, table=table, guid=descriptor.source_id, file=descriptor.stream_name)

        retries_left = min(max_retries, len(containers))
        for container in containers:
            try:
                blob_service = BlobServiceClient(container.account_uri, proxies=proxy_dict)
                blob_client = blob_service.get_blob_client(container=container.object_name, blob=blob_name)
                blob_client.upload_blob(data=stream, timeout=timeout)
                self._resource_manager.report_resource_usage_result(container.storage_account_name, True)
                return BlobDescriptor(blob_client.url, descriptor.size, descriptor.source_id)
            except Exception as e:
                retries_left = retries_left - 1
                # TODO: log the retry once we have a proper logging system
                self._resource_manager.report_resource_usage_result(container.storage_account_name, False)
                if retries_left == 0:
                    raise KustoBlobError(e)