1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
|
import pytest
from azure.kusto.data._telemetry import MonitoredActivity, Span
from azure.kusto.data.client_request_properties import ClientRequestProperties
def test_run_none_invoker():
# Edge case test for invoke method with None invoker function
with pytest.raises(TypeError):
MonitoredActivity.invoke(None, "test_span")
@pytest.mark.asyncio
async def test_run_async_valid_invoker():
# Happy path test for invoke_async method with valid invoker function and name of span
async def invoker():
return "Hello World"
span = await MonitoredActivity.invoke_async(invoker, "test_span")
assert span == "Hello World"
def test_run_valid_invoker():
# Happy path test for invoke method with valid invoker function and name of span
def invoker():
return "Hello World"
span = MonitoredActivity.invoke(invoker, "test_span")
assert span == "Hello World"
@pytest.mark.asyncio
async def test_run_async_none_invoker():
# Edge case test for invoke_async method with None invoker function
with pytest.raises(TypeError):
await MonitoredActivity.invoke_async(None, "test_span")
def test_run_sync_behavior():
# General behavior test for invoke method running the span synchronously
def invoker():
return "Hello World"
span = MonitoredActivity.invoke(invoker, "test_span")
assert span == "Hello World"
@pytest.mark.asyncio
async def test_run_async_behavior():
# General behavior test for invoke_async method running the span asynchronously
async def invoker():
return "Hello World"
span = await MonitoredActivity.invoke_async(invoker, "test_span")
assert span == "Hello World"
def test_tracing_attributes_parameter():
def invoker():
return "Hello World"
tracing_attributes = {"key": "value"}
result = MonitoredActivity.invoke(invoker, tracing_attributes=tracing_attributes)
assert result == "Hello World"
def test_get_client_request_properties_attributes():
attributes = ClientRequestProperties().get_tracing_attributes()
keynames = {"client_request_id"}
assert isinstance(attributes, dict)
for key, val in attributes.items():
assert key in keynames
assert isinstance(val, str)
for key in keynames:
assert key in attributes.keys()
def test_create_query_attributes():
attributes = Span.create_query_attributes("cluster_test", "database_test", ClientRequestProperties())
keynames = {"kusto_cluster", "database", "client_request_id"}
assert isinstance(attributes, dict)
for key, val in attributes.items():
assert isinstance(val, str)
for key in keynames:
assert key in attributes.keys()
attributes = Span.create_query_attributes("cluster_test", "database_test")
keynames = {"kusto_cluster", "database"}
assert isinstance(attributes, dict)
for key, val in attributes.items():
assert isinstance(val, str)
for key in keynames:
assert key in attributes.keys()
def test_create_ingest_attributes():
attributes = Span.create_streaming_ingest_attributes("cluster_test", "database_test", "table", ClientRequestProperties())
keynames = {"kusto_cluster", "database", "table", "client_request_id"}
assert isinstance(attributes, dict)
for key, val in attributes.items():
assert isinstance(val, str)
for key in keynames:
assert key in attributes.keys()
attributes = Span.create_streaming_ingest_attributes("cluster_test", "database_test", "table")
keynames = {"kusto_cluster", "database", "table"}
assert isinstance(attributes, dict)
for key, val in attributes.items():
assert isinstance(val, str)
for key in keynames:
assert key in attributes.keys()
def test_create_http_attributes():
attributes = Span.create_http_attributes("method_test", "url_test")
assert attributes == {"component": "http", "http.method": "method_test", "http.url": "url_test"}
headers = {"User-Agent": "user_agent_test"}
attributes = Span.create_http_attributes("method_test", "url_test", headers)
assert attributes == {"component": "http", "http.method": "method_test", "http.url": "url_test", "http.user_agent": "user_agent_test"}
|