File: test_ingest_telemetry.py

package info (click to toggle)
azure-kusto-python 5.0.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,704 kB
  • sloc: python: 10,633; sh: 13; makefile: 3
file content (26 lines) | stat: -rw-r--r-- 1,069 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from io import BytesIO

from azure.kusto.ingest import FileDescriptor, BlobDescriptor, StreamDescriptor
from azure.kusto.ingest.ingestion_properties import IngestionProperties


def test_get_tracing_attributes():
    ingestion_properties = IngestionProperties("database_test", "table_test")
    assert {"database": "database_test", "table": "table_test"} == ingestion_properties.get_tracing_attributes()

    dummy_stream = BytesIO(b"dummy")
    stream = StreamDescriptor(dummy_stream)
    dummy_path = "dummy"
    blob = BlobDescriptor(dummy_path)
    file = FileDescriptor(dummy_path)

    descriptors = [stream, blob, file]
    keynames = [{"stream_name", "source_id"}, {"blob_uri", "source_id"}, {"file_path", "source_id"}]
    for i in range(len(descriptors)):
        attributes = descriptors[i].get_tracing_attributes()
        assert isinstance(attributes, dict)
        for key, val in attributes.items():
            assert key in keynames[i]
            assert isinstance(val, str)
        for key in keynames[i]:
            assert key in attributes.keys()