File: ingestion_blob_info.py

package info (click to toggle)
azure-kusto-python 5.0.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,704 kB
  • sloc: python: 10,633; sh: 13; makefile: 3
file content (81 lines) | stat: -rw-r--r-- 3,967 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License
import json
import uuid
from datetime import datetime
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from azure.kusto.ingest import BlobDescriptor, IngestionProperties


class IngestionBlobInfo:
    def __init__(
        self,
        blob_descriptor: "BlobDescriptor",
        ingestion_properties: "IngestionProperties",
        auth_context=None,
        application_for_tracing=None,
        client_version_for_tracing=None,
    ):
        self.properties = dict()
        self.properties["BlobPath"] = blob_descriptor.path
        if blob_descriptor.size:
            self.properties["RawDataSize"] = blob_descriptor.size
        self.properties["DatabaseName"] = ingestion_properties.database
        self.properties["TableName"] = ingestion_properties.table
        self.properties["RetainBlobOnSuccess"] = True
        self.properties["FlushImmediately"] = ingestion_properties.flush_immediately
        self.properties["IgnoreSizeLimit"] = False
        self.properties["ReportLevel"] = ingestion_properties.report_level.value
        self.properties["ReportMethod"] = ingestion_properties.report_method.value
        self.properties["SourceMessageCreationTime"] = datetime.utcnow().isoformat()
        self.properties["Id"] = str(blob_descriptor.source_id)
        self.properties["ApplicationForTracing"] = application_for_tracing
        self.properties["ClientVersionForTracing"] = client_version_for_tracing

        additional_properties = ingestion_properties.additional_properties or {}
        additional_properties["authorizationContext"] = auth_context

        tags = []
        if ingestion_properties.additional_tags:
            tags.extend(ingestion_properties.additional_tags)
        if ingestion_properties.drop_by_tags:
            tags.extend(["drop-by:" + drop for drop in ingestion_properties.drop_by_tags])
        if ingestion_properties.ingest_by_tags:
            tags.extend(["ingest-by:" + ingest for ingest in ingestion_properties.ingest_by_tags])
        if tags:
            additional_properties["tags"] = _convert_list_to_json(tags)
        if ingestion_properties.ingest_if_not_exists:
            additional_properties["ingestIfNotExists"] = _convert_list_to_json(ingestion_properties.ingest_if_not_exists)
        if ingestion_properties.ingestion_mapping:
            json_string = _convert_dict_to_json(ingestion_properties.ingestion_mapping)
            additional_properties["ingestionMapping"] = json_string

        if ingestion_properties.ingestion_mapping_reference:
            additional_properties["ingestionMappingReference"] = ingestion_properties.ingestion_mapping_reference
        if ingestion_properties.ingestion_mapping_type:
            additional_properties["ingestionMappingType"] = ingestion_properties.ingestion_mapping_type.value
        if ingestion_properties.validation_policy:
            additional_properties["ValidationPolicy"] = _convert_dict_to_json(ingestion_properties.validation_policy)
        if ingestion_properties.format:
            additional_properties["format"] = ingestion_properties.format.kusto_value
        if ingestion_properties.ignore_first_record:
            additional_properties["ignoreFirstRecord"] = ingestion_properties.ignore_first_record

        if additional_properties:
            self.properties["AdditionalProperties"] = additional_properties

    def to_json(self):
        """Converts this object to a json string"""
        return _convert_list_to_json(self.properties)


def _convert_list_to_json(array):
    """Converts array to a json string"""
    return json.dumps(array, skipkeys=False, allow_nan=False, indent=None, separators=(",", ":"))


def _convert_dict_to_json(array):
    """Converts array to a json string"""
    return json.dumps(array, skipkeys=False, allow_nan=False, indent=None, separators=(",", ":"), sort_keys=True, default=lambda o: o.__dict__)