1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License
import json
import re
import unittest
from uuid import UUID
from azure.kusto.data.data_format import DataFormat
from azure.kusto.ingest import (
BlobDescriptor,
IngestionProperties,
ColumnMapping,
ReportLevel,
ReportMethod,
ValidationPolicy,
ValidationOptions,
ValidationImplications,
)
from azure.kusto.ingest.ingestion_blob_info import IngestionBlobInfo
TIMESTAMP_REGEX = "[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{6}"
class IngestionBlobInfoTest(unittest.TestCase):
"""Tests serialization of ingestion blob info. This serialization will be queued to the DM."""
def test_blob_info_csv_mapping(self):
"""Tests serialization of csv ingestion blob info."""
validation_policy = ValidationPolicy(ValidationOptions.ValidateCsvInputConstantColumns, ValidationImplications.BestEffort)
column_mapping = ColumnMapping("ColumnName", "cslDataType", ordinal=1)
properties = IngestionProperties(
database="database",
table="table",
data_format=DataFormat.CSV,
column_mappings=[column_mapping],
additional_tags=["tag"],
ingest_if_not_exists=["ingestIfNotExistTags"],
ingest_by_tags=["ingestByTags"],
drop_by_tags=["dropByTags"],
flush_immediately=True,
report_level=ReportLevel.DoNotReport,
report_method=ReportMethod.Queue,
validation_policy=validation_policy,
)
blob = BlobDescriptor("somepath", 10)
blob_info = IngestionBlobInfo(blob, properties, auth_context="authorizationContextText")
self._verify_ingestion_blob_info_result(blob_info.to_json())
def test_blob_csv_mapping_reference(self):
"""Tests serialization of ingestion blob info with csv mapping reference."""
validation_policy = ValidationPolicy(ValidationOptions.ValidateCsvInputConstantColumns, ValidationImplications.BestEffort)
properties = IngestionProperties(
database="database",
table="table",
data_format=DataFormat.CSV,
ingestion_mapping_reference="csvMappingReference",
additional_tags=["tag"],
ingest_if_not_exists=["ingestIfNotExistTags"],
ingest_by_tags=["ingestByTags"],
drop_by_tags=["dropByTags"],
flush_immediately=True,
report_level=ReportLevel.DoNotReport,
report_method=ReportMethod.Queue,
validation_policy=validation_policy,
)
blob = BlobDescriptor("somepath", 10)
blob_info = IngestionBlobInfo(blob, properties, auth_context="authorizationContextText")
self._verify_ingestion_blob_info_result(blob_info.to_json())
def test_blob_info_json_mapping(self):
"""Tests serialization of json ingestion blob info."""
validation_policy = ValidationPolicy(ValidationOptions.ValidateCsvInputConstantColumns, ValidationImplications.BestEffort)
properties = IngestionProperties(
database="database",
table="table",
data_format=DataFormat.JSON,
column_mappings=[ColumnMapping("ColumnName", "datatype", path="jsonpath")],
additional_tags=["tag"],
ingest_if_not_exists=["ingestIfNotExistTags"],
ingest_by_tags=["ingestByTags"],
drop_by_tags=["dropByTags"],
flush_immediately=True,
report_level=ReportLevel.DoNotReport,
report_method=ReportMethod.Queue,
validation_policy=validation_policy,
)
blob = BlobDescriptor("somepath", 10)
blob_info = IngestionBlobInfo(blob, properties, auth_context="authorizationContextText")
self._verify_ingestion_blob_info_result(blob_info.to_json())
def test_blob_json_mapping_reference(self):
"""Tests serialization of ingestion blob info with json mapping reference."""
validation_policy = ValidationPolicy(ValidationOptions.ValidateCsvInputConstantColumns, ValidationImplications.BestEffort)
properties = IngestionProperties(
database="database",
table="table",
data_format=DataFormat.JSON,
ingestion_mapping_reference="jsonMappingReference",
additional_tags=["tag"],
ingest_if_not_exists=["ingestIfNotExistTags"],
ingest_by_tags=["ingestByTags"],
drop_by_tags=["dropByTags"],
flush_immediately=True,
report_level=ReportLevel.DoNotReport,
report_method=ReportMethod.Queue,
validation_policy=validation_policy,
)
blob = BlobDescriptor("somepath", 10)
blob_info = IngestionBlobInfo(blob, properties, auth_context="authorizationContextText")
self._verify_ingestion_blob_info_result(blob_info.to_json())
def _verify_ingestion_blob_info_result(self, ingestion_blob_info):
result = json.loads(ingestion_blob_info)
assert result is not None
assert isinstance(result, dict)
assert result["BlobPath"] == "somepath"
assert result["DatabaseName"] == "database"
assert result["TableName"] == "table"
assert isinstance(result["RawDataSize"], int)
assert isinstance(result["IgnoreSizeLimit"], bool)
assert isinstance(result["FlushImmediately"], bool)
assert isinstance(result["RetainBlobOnSuccess"], bool)
assert isinstance(result["ReportMethod"], int)
assert isinstance(result["ReportLevel"], int)
assert isinstance(UUID(result["Id"]), UUID)
assert re.match(TIMESTAMP_REGEX, result["SourceMessageCreationTime"])
assert result["AdditionalProperties"]["authorizationContext"] == "authorizationContextText"
assert result["AdditionalProperties"]["ingestIfNotExists"] == '["ingestIfNotExistTags"]'
assert result["AdditionalProperties"]["ValidationPolicy"] in (
'{"ValidationImplications":1,"ValidationOptions":1}',
'{"ValidationImplications":ValidationImplications.BestEffort,"ValidationOptions":ValidationOptions.ValidateCsvInputConstantColumns}',
)
assert result["AdditionalProperties"]["tags"] == '["tag","drop-by:dropByTags","ingest-by:ingestByTags"]'
|