1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
|
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.
import os
import unittest
import uuid
import pytest
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.documents as documents
import test_config
from azure.cosmos.exceptions import CosmosHttpResponseError
from azure.cosmos.partition_key import PartitionKey
class _config:
host = test_config.TestConfig.host
master_key = test_config.TestConfig.masterKey
connection_policy = test_config.TestConfig.connectionPolicy
PARTITION_KEY = 'key'
UNIQUE_PARTITION_KEY = 'uniquePartitionKey'
FIELD = 'field'
DOCUMENTS_COUNT = 400
DOCS_WITH_SAME_PARTITION_KEY = 200
docs_with_numeric_id = 0
sum = 0
@pytest.mark.cosmosQuery
class TestAggregateQuery(unittest.TestCase):
client: cosmos_client.CosmosClient = None
@classmethod
def setUpClass(cls):
cls._all_tests = []
cls._setup()
cls._generate_test_configs()
@classmethod
def tearDownClass(cls) -> None:
try:
cls.created_db.delete_container(cls.created_collection.id)
except CosmosHttpResponseError:
pass
@classmethod
def _setup(cls):
if not _config.master_key or not _config.host:
raise Exception(
"You must specify your Azure Cosmos account values for "
"'masterKey' and 'host' at the top of this class to run the "
"tests.")
cls.client = cosmos_client.CosmosClient(_config.host, _config.master_key)
cls.created_db = cls.client.get_database_client(test_config.TestConfig.TEST_DATABASE_ID)
cls.created_collection = cls._create_collection(cls.created_db)
if _config.host == "https://localhost:8081/":
os.environ["AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY"] = "True"
# test documents
document_definitions = []
values = [None, False, True, "abc", "cdfg", "opqrs", "ttttttt", "xyz", "oo", "ppp"]
for value in values:
d = {_config.PARTITION_KEY: value, 'id': str(uuid.uuid4())}
document_definitions.append(d)
for i in range(_config.DOCS_WITH_SAME_PARTITION_KEY):
d = {_config.PARTITION_KEY: _config.UNIQUE_PARTITION_KEY,
'resourceId': i,
_config.FIELD: i + 1,
'id': str(uuid.uuid4())}
document_definitions.append(d)
_config.docs_with_numeric_id = \
_config.DOCUMENTS_COUNT - len(values) - _config.DOCS_WITH_SAME_PARTITION_KEY
for i in range(_config.docs_with_numeric_id):
d = {_config.PARTITION_KEY: i + 1, 'id': str(uuid.uuid4())}
document_definitions.append(d)
_config.sum = _config.docs_with_numeric_id \
* (_config.docs_with_numeric_id + 1) / 2.0
cls._insert_doc(cls.created_collection, document_definitions)
@classmethod
def _generate_test_configs(cls):
aggregate_query_format = 'SELECT VALUE {}(r.{}) FROM r WHERE {}'
aggregate_orderby_query_format = 'SELECT VALUE {}(r.{}) FROM r WHERE {} ORDER BY r.{}'
aggregate_configs = [
['AVG', _config.sum / _config.docs_with_numeric_id,
'IS_NUMBER(r.{})'.format(_config.PARTITION_KEY)],
['AVG', None, 'true'],
['COUNT', _config.DOCUMENTS_COUNT, 'true'],
['MAX', 'xyz', 'true'],
['MIN', None, 'true'],
['SUM', _config.sum, 'IS_NUMBER(r.{})'.format(_config.PARTITION_KEY)],
['SUM', None, 'true']
]
for operator, expected, condition in aggregate_configs:
cls._all_tests.append([
'{} {}'.format(operator, condition),
aggregate_query_format.format(operator, _config.PARTITION_KEY, condition),
expected])
cls._all_tests.append([
'{} {} OrderBy'.format(operator, condition),
aggregate_orderby_query_format.format(operator, _config.PARTITION_KEY, condition,
_config.PARTITION_KEY),
expected])
aggregate_single_partition_format = 'SELECT VALUE {}(r.{}) FROM r WHERE r.{} = \'{}\''
aggregate_orderby_single_partition_format = 'SELECT {}(r.{}) FROM r WHERE r.{} = \'{}\''
same_partiton_sum = _config.DOCS_WITH_SAME_PARTITION_KEY * (_config.DOCS_WITH_SAME_PARTITION_KEY + 1) / 2.0
aggregate_single_partition_configs = [
['AVG', same_partiton_sum / _config.DOCS_WITH_SAME_PARTITION_KEY],
['COUNT', _config.DOCS_WITH_SAME_PARTITION_KEY],
['MAX', _config.DOCS_WITH_SAME_PARTITION_KEY],
['MIN', 1],
['SUM', same_partiton_sum]
]
for operator, expected in aggregate_single_partition_configs:
cls._all_tests.append([
'{} SinglePartition {}'.format(operator, 'SELECT VALUE'),
aggregate_single_partition_format.format(
operator, _config.FIELD, _config.PARTITION_KEY, _config.UNIQUE_PARTITION_KEY), expected])
cls._all_tests.append([
'{} SinglePartition {}'.format(operator, 'SELECT'),
aggregate_orderby_single_partition_format.format(
operator, _config.FIELD, _config.PARTITION_KEY, _config.UNIQUE_PARTITION_KEY),
Exception()])
def test_run_all(self):
for test_name, query, expected_result in self._all_tests:
test_name = "test_%s" % test_name
try:
self._run_one(query, expected_result)
print(test_name + ': ' + query + " PASSED")
except Exception as e:
print(test_name + ': ' + query + " FAILED")
raise e
def _run_one(self, query, expected_result):
self._execute_query_and_validate_results(self.created_collection, query, expected_result)
@classmethod
def _create_collection(cls, created_db):
# type: (Database) -> Container
created_collection = created_db.create_container(
id='aggregate tests collection ' + str(uuid.uuid4()),
indexing_policy={
'includedPaths': [
{
'path': '/',
'indexes': [
{
'kind': 'Range',
'dataType': 'Number'
},
{
'kind': 'Range',
'dataType': 'String'
}
]
}
]
},
partition_key=PartitionKey(
path='/{}'.format(_config.PARTITION_KEY),
kind=documents.PartitionKind.Hash,
),
offer_throughput=10100
)
return created_collection
@classmethod
def _insert_doc(cls, collection, document_definitions):
# type: (Container, Dict[str, Any]) -> Dict[str, Any]
created_docs = []
for d in document_definitions:
created_doc = collection.create_item(body=d)
created_docs.append(created_doc)
return created_docs
def _execute_query_and_validate_results(self, collection, query, expected):
# type: (Container, str, Dict[str, Any]) -> None
# executes the query and validates the results against the expected results
result_iterable = collection.query_items(
query=query,
enable_cross_partition_query=True
)
def _verify_result():
######################################
# test next() behavior
######################################
it = result_iterable.__iter__()
def invokeNext():
return next(it)
# validate that invocations of next() produces the same results as expected
item = invokeNext()
self.assertEqual(item, expected)
# after the result set is exhausted, invoking next must raise a StopIteration exception
self.assertRaises(StopIteration, invokeNext)
######################################
# test by_page() behavior
######################################
page_iter = result_iterable.by_page()
fetched_res = list(next(page_iter))
fetched_size = len(fetched_res)
self.assertEqual(fetched_size, 1)
self.assertEqual(fetched_res[0], expected)
# no more results will be returned
with self.assertRaises(StopIteration):
next(page_iter)
if isinstance(expected, Exception):
self.assertRaises(CosmosHttpResponseError, _verify_result)
else:
_verify_result()
if __name__ == "__main__":
unittest.main()
|