1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
|
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License
import random
from typing import Union, AnyStr, IO, List, Optional, Dict
from urllib.parse import urlparse
from azure.storage.blob import BlobServiceClient
from azure.core.tracing.decorator import distributed_trace
from azure.core.tracing import SpanKind
from azure.storage.queue import QueueServiceClient, TextBase64EncodePolicy
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data._telemetry import MonitoredActivity
from azure.kusto.data.exceptions import KustoClosedError, KustoServiceError
from ._ingest_telemetry import IngestTracingAttributes
from ._resource_manager import _ResourceManager, _ResourceUri
from .base_ingest_client import BaseIngestClient, IngestionResult, IngestionStatus
from .descriptors import BlobDescriptor, FileDescriptor, StreamDescriptor
from .exceptions import KustoInvalidEndpointError, KustoQueueError
from azure.kusto.data.exceptions import KustoBlobError
from .ingestion_blob_info import IngestionBlobInfo
from .ingestion_properties import IngestionProperties
class QueuedIngestClient(BaseIngestClient):
"""
Queued ingest client provides methods to allow queued ingestion into kusto (ADX).
To learn more about the different types of ingestions and when to use each, visit:
https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
"""
_INGEST_PREFIX = "ingest-"
_SERVICE_CLIENT_TIMEOUT_SECONDS = 10 * 60
_MAX_RETRIES = 3
def __init__(self, kcsb: Union[str, KustoConnectionStringBuilder], auto_correct_endpoint: bool = True):
"""Kusto Ingest Client constructor.
:param kcsb: The connection string to initialize KustoClient.
"""
super().__init__()
if not isinstance(kcsb, KustoConnectionStringBuilder):
kcsb = KustoConnectionStringBuilder(kcsb)
if auto_correct_endpoint:
kcsb["Data Source"] = BaseIngestClient.get_ingestion_endpoint(kcsb.data_source)
self._proxy_dict: Optional[Dict[str, str]] = None
self._connection_datasource = kcsb.data_source
self._resource_manager = _ResourceManager(KustoClient(kcsb))
self._endpoint_service_type = None
self._suggested_endpoint_uri = None
self.application_for_tracing = kcsb.client_details.application_for_tracing
self.client_version_for_tracing = kcsb.client_details.version_for_tracing
def close(self) -> None:
self._resource_manager.close()
super().close()
def set_proxy(self, proxy_url: str):
self._resource_manager.set_proxy(proxy_url)
self._proxy_dict = {"http": proxy_url, "https": proxy_url}
@distributed_trace(name_of_span="QueuedIngestClient.ingest_from_file", kind=SpanKind.CLIENT)
def ingest_from_file(self, file_descriptor: Union[FileDescriptor, str], ingestion_properties: IngestionProperties) -> IngestionResult:
"""Enqueue an ingest command from local files.
To learn more about ingestion methods go to:
https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
:param file_descriptor: a FileDescriptor to be ingested.
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
"""
file_descriptor = FileDescriptor.get_instance(file_descriptor)
IngestTracingAttributes.set_ingest_descriptor_attributes(file_descriptor, ingestion_properties)
super().ingest_from_file(file_descriptor, ingestion_properties)
containers = self._get_containers()
file_descriptor, should_compress = BaseIngestClient._prepare_file(file_descriptor, ingestion_properties)
with file_descriptor.open(should_compress) as stream:
blob_descriptor = self.upload_blob(
containers,
file_descriptor,
ingestion_properties.database,
ingestion_properties.table,
stream,
self._proxy_dict,
self._SERVICE_CLIENT_TIMEOUT_SECONDS,
self._MAX_RETRIES,
)
return self.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_properties)
@distributed_trace(name_of_span="QueuedIngestClient.ingest_from_stream", kind=SpanKind.CLIENT)
def ingest_from_stream(self, stream_descriptor: Union[StreamDescriptor, IO[AnyStr]], ingestion_properties: IngestionProperties) -> IngestionResult:
"""Ingest from io streams.
:param stream_descriptor: An object that contains a description of the stream to be ingested.
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
"""
stream_descriptor = StreamDescriptor.get_instance(stream_descriptor)
IngestTracingAttributes.set_ingest_descriptor_attributes(stream_descriptor, ingestion_properties)
super().ingest_from_stream(stream_descriptor, ingestion_properties)
containers = self._get_containers()
stream_descriptor = BaseIngestClient._prepare_stream(stream_descriptor, ingestion_properties)
blob_descriptor = self.upload_blob(
containers,
stream_descriptor,
ingestion_properties.database,
ingestion_properties.table,
stream_descriptor.stream,
self._proxy_dict,
self._SERVICE_CLIENT_TIMEOUT_SECONDS,
self._MAX_RETRIES,
)
return self.ingest_from_blob(blob_descriptor, ingestion_properties=ingestion_properties)
@distributed_trace(name_of_span="QueuedIngestClient.ingest_from_blob", kind=SpanKind.CLIENT)
def ingest_from_blob(self, blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties) -> IngestionResult:
"""Enqueue an ingest command from azure blobs.
To learn more about ingestion methods go to:
https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-overview#ingestion-methods
:param azure.kusto.ingest.BlobDescriptor blob_descriptor: An object that contains a description of the blob to be ingested.
:param azure.kusto.ingest.IngestionProperties ingestion_properties: Ingestion properties.
"""
IngestTracingAttributes.set_ingest_descriptor_attributes(blob_descriptor, ingestion_properties)
if self._is_closed:
raise KustoClosedError()
queues = self._resource_manager.get_ingestion_queues()
authorization_context = self._resource_manager.get_authorization_context()
ingestion_blob_info = IngestionBlobInfo(
blob_descriptor,
ingestion_properties=ingestion_properties,
auth_context=authorization_context,
application_for_tracing=self.application_for_tracing,
client_version_for_tracing=self.client_version_for_tracing,
)
ingestion_blob_info_json = ingestion_blob_info.to_json()
retries_left = min(self._MAX_RETRIES, len(queues))
for queue in queues:
try:
with QueueServiceClient(queue.account_uri, proxies=self._proxy_dict) as queue_service:
with queue_service.get_queue_client(queue=queue.object_name, message_encode_policy=TextBase64EncodePolicy()) as queue_client:
# trace enqueuing of blob for ingestion
invoker = lambda: queue_client.send_message(content=ingestion_blob_info_json, timeout=self._SERVICE_CLIENT_TIMEOUT_SECONDS)
enqueue_trace_attributes = IngestTracingAttributes.create_enqueue_request_attributes(queue_client.queue_name, blob_descriptor.source_id)
MonitoredActivity.invoke(invoker, name_of_span="QueuedIngestClient.enqueue_request", tracing_attributes=enqueue_trace_attributes)
self._resource_manager.report_resource_usage_result(queue.storage_account_name, True)
return IngestionResult(
IngestionStatus.QUEUED, ingestion_properties.database, ingestion_properties.table, blob_descriptor.source_id, blob_descriptor.path
)
except Exception as e:
retries_left = retries_left - 1
# TODO: log the retry once we have a proper logging system
self._resource_manager.report_resource_usage_result(queue.storage_account_name, False)
if retries_left == 0:
raise KustoQueueError() from e
def _get_containers(self) -> List[_ResourceUri]:
return self._resource_manager.get_containers()
def upload_blob(
self,
containers: List[_ResourceUri],
descriptor: Union[FileDescriptor, "StreamDescriptor"],
database: str,
table: str,
stream: IO[AnyStr],
proxy_dict: Optional[Dict[str, str]],
timeout: int,
max_retries: int,
) -> "BlobDescriptor":
"""
Uploads and transforms FileDescriptor or StreamDescriptor into a BlobDescriptor instance
:param List[_ResourceUri] containers: blob containers
:param Union[FileDescriptor, "StreamDescriptor"] descriptor:
:param string database: database to be ingested to
:param string table: table to be ingested to
:param IO[AnyStr] stream: stream to be ingested from
:param Optional[Dict[str, str]] proxy_dict: proxy urls
:param int timeout: Azure service call timeout in seconds
:return new BlobDescriptor instance
"""
blob_name = "{db}__{table}__{guid}__{file}".format(db=database, table=table, guid=descriptor.source_id, file=descriptor.stream_name)
retries_left = min(max_retries, len(containers))
for container in containers:
try:
blob_service = BlobServiceClient(container.account_uri, proxies=proxy_dict)
blob_client = blob_service.get_blob_client(container=container.object_name, blob=blob_name)
blob_client.upload_blob(data=stream, timeout=timeout)
self._resource_manager.report_resource_usage_result(container.storage_account_name, True)
return BlobDescriptor(blob_client.url, descriptor.size, descriptor.source_id)
except Exception as e:
retries_left = retries_left - 1
# TODO: log the retry once we have a proper logging system
self._resource_manager.report_resource_usage_result(container.storage_account_name, False)
if retries_left == 0:
raise KustoBlobError(e)
|