1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
|
"""Tests for KustoClient."""
import json
import sys
from unittest.mock import patch
import pytest
from azure.kusto.data._cloud_settings import CloudSettings
from azure.kusto.data._decorators import aio_documented_by
from azure.kusto.data.client_request_properties import ClientRequestProperties
from azure.kusto.data.exceptions import KustoClosedError, KustoMultiApiError, KustoNetworkError
from azure.kusto.data.helpers import dataframe_from_result_table
from ..kusto_client_common import KustoClientTestsMixin, mocked_requests_post, proxy_kcsb
from ..test_kusto_client import TestKustoClient as KustoClientTestsSync
PANDAS = False
try:
import pandas
PANDAS = True
except:
pass
run_aio_tests = False
try:
from azure.kusto.data.aio.client import KustoClient
from aioresponses import aioresponses, CallbackResult
run_aio_tests = True
except:
pass
if sys.version_info < (3, 6):
run_aio_tests = False
@pytest.mark.skipif(not run_aio_tests, reason="requires aio")
@aio_documented_by(KustoClientTestsSync)
class TestKustoClient(KustoClientTestsMixin):
@staticmethod
def _mock_callback(url, **kwargs):
body = json.dumps(mocked_requests_post(str(url), **kwargs).json())
return CallbackResult(status=200, body=body)
def _mock_query(self, aioresponses_mock):
url = "{host}/v2/rest/query".format(host=self.HOST)
aioresponses_mock.post(url, callback=self._mock_callback)
def _mock_mgmt(self, aioresponses_mock):
url = "{host}/v1/rest/mgmt".format(host=self.HOST)
aioresponses_mock.post(url, callback=self._mock_callback)
def _mock_cloud_info(self, aioresponses_mock):
url = "{host}/v1/rest/auth/metadata".format(host=self.HOST)
aioresponses_mock.get(url, callback=self._mock_callback)
@aio_documented_by(KustoClientTestsSync.test_throws_on_close)
@pytest.mark.asyncio
async def test_throws_on_close(self):
"""Test query V2."""
async with KustoClient(self.HOST) as client:
pass
with pytest.raises(KustoClosedError):
await client.execute_query("PythonTest", "Deft")
@aio_documented_by(KustoClientTestsSync.test_sanity_query)
@pytest.mark.asyncio
async def test_sanity_query(self):
with aioresponses() as aioresponses_mock:
self._mock_query(aioresponses_mock)
async with KustoClient(self.HOST) as client:
response = await client.execute_query("PythonTest", "Deft")
first_request = next(iter(aioresponses_mock.requests.values()))
self._assert_client_request_id(first_request[0].kwargs)
self._assert_sanity_query_response(response)
@pytest.mark.asyncio
async def test_raise_network(self):
with aioresponses() as aioresponses_mock:
self._mock_query(aioresponses_mock)
async with KustoClient(self.HOST) as client:
with pytest.raises(KustoNetworkError):
response = await client.execute_query("PythonTest", "raiseNetwork")
@aio_documented_by(KustoClientTestsSync.test_sanity_control_command)
@pytest.mark.asyncio
async def test_sanity_control_command(self):
with aioresponses() as aioresponses_mock:
self._mock_mgmt(aioresponses_mock)
async with KustoClient(self.HOST) as client:
response = await client.execute_mgmt("NetDefaultDB", ".show version")
self._assert_sanity_control_command_response(response)
@pytest.mark.skipif(not PANDAS, reason="requires pandas")
@aio_documented_by(KustoClientTestsSync.test_sanity_data_frame)
@pytest.mark.asyncio
async def test_sanity_data_frame(self):
with aioresponses() as aioresponses_mock:
self._mock_query(aioresponses_mock)
async with KustoClient(self.HOST) as client:
response = await client.execute_query("PythonTest", "Deft")
data_frame = dataframe_from_result_table(response.primary_results[0])
self._assert_sanity_data_frame_response(data_frame)
@aio_documented_by(KustoClientTestsSync.test_partial_results)
@pytest.mark.asyncio
async def test_partial_results(self):
async with KustoClient(self.HOST) as client:
query = """set truncationmaxrecords = 5;
range x from 1 to 10 step 1"""
properties = ClientRequestProperties()
properties.set_option(ClientRequestProperties.results_defer_partial_query_failures_option_name, False)
with aioresponses() as aioresponses_mock:
self._mock_query(aioresponses_mock)
with pytest.raises(KustoMultiApiError) as e:
await client.execute_query("PythonTest", query, properties)
errors = e.value.get_api_errors()
assert len(errors) == 1
assert errors[0].code == "LimitsExceeded"
properties.set_option(ClientRequestProperties.results_defer_partial_query_failures_option_name, True)
self._mock_query(aioresponses_mock)
response = await client.execute_query("PythonTest", query, properties)
self._assert_partial_results_response(response)
@aio_documented_by(KustoClientTestsSync.test_admin_then_query)
@pytest.mark.asyncio
async def test_admin_then_query(self):
with aioresponses() as aioresponses_mock:
self._mock_mgmt(aioresponses_mock)
async with KustoClient(self.HOST) as client:
query = ".show tables | project DatabaseName, TableName"
response = await client.execute_mgmt("PythonTest", query)
self._assert_admin_then_query_response(response)
@aio_documented_by(KustoClientTestsSync.test_dynamic)
@pytest.mark.asyncio
async def test_dynamic(self):
with aioresponses() as aioresponses_mock:
self._mock_query(aioresponses_mock)
async with KustoClient(self.HOST) as client:
query = """print dynamic(123), dynamic("123"), dynamic("test bad json"),"""
""" dynamic(null), dynamic('{"rowId":2,"arr":[0,2]}'), dynamic({"rowId":2,"arr":[0,2]})"""
response = await client.execute_query("PythonTest", query)
row = response.primary_results[0].rows[0]
self._assert_dynamic_response(row)
@aio_documented_by(KustoClientTestsSync.test_empty_result)
@pytest.mark.asyncio
async def test_empty_result(self):
with aioresponses() as aioresponses_mock:
self._mock_query(aioresponses_mock)
async with KustoClient(self.HOST) as client:
query = """print 'a' | take 0"""
response = await client.execute_query("PythonTest", query)
assert response.primary_results[0]
@aio_documented_by(KustoClientTestsSync.test_null_values_in_data)
@pytest.mark.asyncio
async def test_null_values_in_data(self):
with aioresponses() as aioresponses_mock:
self._mock_query(aioresponses_mock)
async with KustoClient(self.HOST) as client:
query = "PrimaryResultName"
response = await client.execute_query("PythonTest", query)
assert response is not None
@aio_documented_by(KustoClientTestsSync.test_sanity_query)
@pytest.mark.asyncio
async def test_request_id(self):
with aioresponses() as aioresponses_mock:
properties = ClientRequestProperties()
request_id = "test_request_id"
properties.client_request_id = request_id
self._mock_query(aioresponses_mock)
async with KustoClient(self.HOST) as client:
response = await client.execute_query("PythonTest", "Deft", properties=properties)
first_request = next(iter(aioresponses_mock.requests.values()))
self._assert_client_request_id(first_request[0].kwargs, value=request_id)
self._assert_sanity_query_response(response)
@aio_documented_by(KustoClientTestsSync.test_proxy_token_providers)
@pytest.mark.asyncio
async def test_proxy_token_providers(self, proxy_kcsb):
"""Test query V2."""
proxy = "https://my_proxy.sample"
kcsb, auth_supports_proxy = proxy_kcsb
async with KustoClient(kcsb) as client:
client.set_proxy(proxy)
assert client._proxy_url == proxy
expected_dict = {"http": proxy, "https": proxy}
if not auth_supports_proxy:
return
assert client._aad_helper.token_provider._proxy_dict == expected_dict
CloudSettings._cloud_cache.clear()
with patch("requests.get", side_effect=mocked_requests_post) as mock_get:
client._aad_helper.token_provider._init_resources()
mock_get.assert_called_with("https://somecluster.kusto.windows.net/v1/rest/auth/metadata", proxies=expected_dict, allow_redirects=False)
|