File: test_ingestion_blob_info.py

package info (click to toggle)
azure-kusto-python 5.0.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,704 kB
  • sloc: python: 10,633; sh: 13; makefile: 3
file content (135 lines) | stat: -rw-r--r-- 6,369 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License
import json
import re
import unittest
from uuid import UUID

from azure.kusto.data.data_format import DataFormat
from azure.kusto.ingest import (
    BlobDescriptor,
    IngestionProperties,
    ColumnMapping,
    ReportLevel,
    ReportMethod,
    ValidationPolicy,
    ValidationOptions,
    ValidationImplications,
)
from azure.kusto.ingest.ingestion_blob_info import IngestionBlobInfo

TIMESTAMP_REGEX = "[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}"


class IngestionBlobInfoTest(unittest.TestCase):
    """Tests serialization of ingestion blob info. This serialization will be queued to the DM."""

    def test_blob_info_csv_mapping(self):
        """Tests serialization of csv ingestion blob info."""
        validation_policy = ValidationPolicy(ValidationOptions.ValidateCsvInputConstantColumns, ValidationImplications.BestEffort)
        column_mapping = ColumnMapping("ColumnName", "cslDataType", ordinal=1)

        properties = IngestionProperties(
            database="database",
            table="table",
            data_format=DataFormat.CSV,
            column_mappings=[column_mapping],
            additional_tags=["tag"],
            ingest_if_not_exists=["ingestIfNotExistTags"],
            ingest_by_tags=["ingestByTags"],
            drop_by_tags=["dropByTags"],
            flush_immediately=True,
            report_level=ReportLevel.DoNotReport,
            report_method=ReportMethod.Queue,
            validation_policy=validation_policy,
        )
        blob = BlobDescriptor("somepath", 10)
        blob_info = IngestionBlobInfo(blob, properties, auth_context="authorizationContextText")
        self._verify_ingestion_blob_info_result(blob_info.to_json())

    def test_blob_csv_mapping_reference(self):
        """Tests serialization of ingestion blob info with csv mapping reference."""
        validation_policy = ValidationPolicy(ValidationOptions.ValidateCsvInputConstantColumns, ValidationImplications.BestEffort)
        properties = IngestionProperties(
            database="database",
            table="table",
            data_format=DataFormat.CSV,
            ingestion_mapping_reference="csvMappingReference",
            additional_tags=["tag"],
            ingest_if_not_exists=["ingestIfNotExistTags"],
            ingest_by_tags=["ingestByTags"],
            drop_by_tags=["dropByTags"],
            flush_immediately=True,
            report_level=ReportLevel.DoNotReport,
            report_method=ReportMethod.Queue,
            validation_policy=validation_policy,
        )
        blob = BlobDescriptor("somepath", 10)
        blob_info = IngestionBlobInfo(blob, properties, auth_context="authorizationContextText")
        self._verify_ingestion_blob_info_result(blob_info.to_json())

    def test_blob_info_json_mapping(self):
        """Tests serialization of json ingestion blob info."""
        validation_policy = ValidationPolicy(ValidationOptions.ValidateCsvInputConstantColumns, ValidationImplications.BestEffort)
        properties = IngestionProperties(
            database="database",
            table="table",
            data_format=DataFormat.JSON,
            column_mappings=[ColumnMapping("ColumnName", "datatype", path="jsonpath")],
            additional_tags=["tag"],
            ingest_if_not_exists=["ingestIfNotExistTags"],
            ingest_by_tags=["ingestByTags"],
            drop_by_tags=["dropByTags"],
            flush_immediately=True,
            report_level=ReportLevel.DoNotReport,
            report_method=ReportMethod.Queue,
            validation_policy=validation_policy,
        )
        blob = BlobDescriptor("somepath", 10)
        blob_info = IngestionBlobInfo(blob, properties, auth_context="authorizationContextText")
        self._verify_ingestion_blob_info_result(blob_info.to_json())

    def test_blob_json_mapping_reference(self):
        """Tests serialization of ingestion blob info with json mapping reference."""
        validation_policy = ValidationPolicy(ValidationOptions.ValidateCsvInputConstantColumns, ValidationImplications.BestEffort)
        properties = IngestionProperties(
            database="database",
            table="table",
            data_format=DataFormat.JSON,
            ingestion_mapping_reference="jsonMappingReference",
            additional_tags=["tag"],
            ingest_if_not_exists=["ingestIfNotExistTags"],
            ingest_by_tags=["ingestByTags"],
            drop_by_tags=["dropByTags"],
            flush_immediately=True,
            report_level=ReportLevel.DoNotReport,
            report_method=ReportMethod.Queue,
            validation_policy=validation_policy,
        )
        blob = BlobDescriptor("somepath", 10)
        blob_info = IngestionBlobInfo(blob, properties, auth_context="authorizationContextText")
        self._verify_ingestion_blob_info_result(blob_info.to_json())

    def _verify_ingestion_blob_info_result(self, ingestion_blob_info):
        result = json.loads(ingestion_blob_info)
        assert result is not None
        assert isinstance(result, dict)
        assert result["BlobPath"] == "somepath"
        assert result["DatabaseName"] == "database"
        assert result["TableName"] == "table"
        assert isinstance(result["RawDataSize"], int)
        assert isinstance(result["IgnoreSizeLimit"], bool)
        assert isinstance(result["FlushImmediately"], bool)
        assert isinstance(result["RetainBlobOnSuccess"], bool)
        assert isinstance(result["ReportMethod"], int)
        assert isinstance(result["ReportLevel"], int)
        assert isinstance(UUID(result["Id"]), UUID)
        assert re.match(TIMESTAMP_REGEX, result["SourceMessageCreationTime"])
        assert result["AdditionalProperties"]["authorizationContext"] == "authorizationContextText"
        assert result["AdditionalProperties"]["ingestIfNotExists"] == '["ingestIfNotExistTags"]'
        assert result["AdditionalProperties"]["ValidationPolicy"] in (
            '{"ValidationImplications":1,"ValidationOptions":1}',
            '{"ValidationImplications":ValidationImplications.BestEffort,"ValidationOptions":ValidationOptions.ValidateCsvInputConstantColumns}',
        )

        assert result["AdditionalProperties"]["tags"] == '["tag","drop-by:dropByTags","ingest-by:ingestByTags"]'