1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
|
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License
import json
from ._status_q import StatusQueue
class StatusMessage:
OperationId = None
Database = None
Table = None
IngestionSourceId = None
IngestionSourcePath = None
RootActivityId = None
_raw = None
def __init__(self, s):
self._raw = s
o = json.loads(s)
for key, value in o.items():
if hasattr(self, key):
try:
setattr(self, key, value)
except:
# TODO: should we set up a logger?
pass
def __str__(self):
return "{}".format(self._raw)
def __repr__(self):
return "{0.__class__.__name__}({0._raw})".format(self)
class SuccessMessage(StatusMessage):
SucceededOn = None
class FailureMessage(StatusMessage):
FailedOn = None
Details = None
ErrorCode = None
FailureStatus = None
OriginatesFromUpdatePolicy = None
ShouldRetry = None
class KustoIngestStatusQueues:
"""Kusto ingest Status Queue.
Use this class to get status messages from Kusto status queues.
Currently there are two queues exposed: `failure` and `success` queues.
"""
def __init__(self, kusto_ingest_client):
self.success = StatusQueue(kusto_ingest_client._resource_manager.get_successful_ingestions_queues, message_cls=SuccessMessage)
self.failure = StatusQueue(kusto_ingest_client._resource_manager.get_failed_ingestions_queues, message_cls=FailureMessage)
|