1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.
import pytest
import test_config
import unittest
import uuid
from azure.cosmos import CosmosClient
from itertools import combinations
from azure.cosmos.partition_key import PartitionKey
from typing import List, Mapping, Set
CONFIG = test_config.TestConfig()
HOST = CONFIG.host
KEY = CONFIG.masterKey
DATABASE_ID = CONFIG.TEST_DATABASE_ID
TEST_NAME = "Query FeedRange "
SINGLE_PARTITION_CONTAINER_ID = TEST_NAME + CONFIG.TEST_SINGLE_PARTITION_CONTAINER_ID
MULTI_PARTITION_CONTAINER_ID = TEST_NAME + CONFIG.TEST_MULTI_PARTITION_CONTAINER_ID
TEST_CONTAINERS_IDS = [SINGLE_PARTITION_CONTAINER_ID, MULTI_PARTITION_CONTAINER_ID]
TEST_OFFER_THROUGHPUTS = [CONFIG.THROUGHPUT_FOR_1_PARTITION, CONFIG.THROUGHPUT_FOR_5_PARTITIONS]
PARTITION_KEY = CONFIG.TEST_CONTAINER_PARTITION_KEY
PK_VALUES = ('pk1', 'pk2', 'pk3')
def add_all_pk_values_to_set(items: List[Mapping[str, str]], pk_value_set: Set[str]) -> None:
if len(items) == 0:
return
pk_values = [item[PARTITION_KEY] for item in items if PARTITION_KEY in item]
pk_value_set.update(pk_values)
@pytest.fixture(scope="class", autouse=True)
def setup_and_teardown():
print("Setup: This runs before any tests")
document_definitions = [{PARTITION_KEY: pk, 'id': str(uuid.uuid4())} for pk in PK_VALUES]
database = CosmosClient(HOST, KEY).get_database_client(DATABASE_ID)
for container_id, offer_throughput in zip(TEST_CONTAINERS_IDS, TEST_OFFER_THROUGHPUTS):
container = database.create_container_if_not_exists(
id=container_id,
partition_key=PartitionKey(path='/' + PARTITION_KEY, kind='Hash'),
offer_throughput=offer_throughput)
for document_definition in document_definitions:
container.upsert_item(body=document_definition)
yield
# Code to run after tests
print("Teardown: This runs after all tests")
def get_container(container_id: str):
client = CosmosClient(HOST, KEY)
db = client.get_database_client(DATABASE_ID)
return db.get_container_client(container_id)
@pytest.mark.cosmosQuery
class TestQueryFeedRange:
@pytest.mark.parametrize('container_id', TEST_CONTAINERS_IDS)
def test_query_with_feed_range_for_all_partitions(self, container_id):
container = get_container(container_id)
query = 'SELECT * from c'
expected_pk_values = set(PK_VALUES)
actual_pk_values = set()
iter_feed_ranges = list(container.read_feed_ranges())
for feed_range in iter_feed_ranges:
items = list(container.query_items(
query=query,
feed_range=feed_range
))
add_all_pk_values_to_set(items, actual_pk_values)
assert actual_pk_values == expected_pk_values
@pytest.mark.parametrize('container_id', TEST_CONTAINERS_IDS)
def test_query_with_feed_range_for_partition_key(self, container_id):
container = get_container(container_id)
query = 'SELECT * from c'
for pk_value in PK_VALUES:
expected_pk_values = {pk_value}
actual_pk_values = set()
feed_range = container.feed_range_from_partition_key(pk_value)
items = list(container.query_items(
query=query,
feed_range=feed_range
))
add_all_pk_values_to_set(items, actual_pk_values)
assert actual_pk_values == expected_pk_values
@pytest.mark.parametrize('container_id', TEST_CONTAINERS_IDS)
def test_query_with_both_feed_range_and_partition_key(self, container_id):
container = get_container(container_id)
expected_error_message = "'feed_range' and 'partition_key' are exclusive parameters, please only set one of them."
query = 'SELECT * from c'
partition_key = PK_VALUES[0]
feed_range = container.feed_range_from_partition_key(partition_key)
with pytest.raises(ValueError) as e:
list(container.query_items(
query=query,
feed_range=feed_range,
partition_key=partition_key
))
assert str(e.value) == expected_error_message
@pytest.mark.parametrize('container_id', TEST_CONTAINERS_IDS)
def test_query_with_feed_range_for_a_full_range(self, container_id):
container = get_container(container_id)
query = 'SELECT * from c'
expected_pk_values = set(PK_VALUES)
actual_pk_values = set()
new_range = test_config.create_range(
range_min="",
range_max="FF",
is_min_inclusive=True,
is_max_inclusive=False,
)
feed_range = test_config.create_feed_range_in_dict(new_range)
items = list(container.query_items(
query=query,
feed_range=feed_range
))
add_all_pk_values_to_set(items, actual_pk_values)
assert expected_pk_values.issubset(actual_pk_values)
def test_query_with_static_continuation(self):
container = get_container(SINGLE_PARTITION_CONTAINER_ID)
query = 'SELECT * from c'
# verify continuation token does not have any impact
for i in range(10):
query_by_page = container.query_items(
query=query,
feed_range={
'Range': {'isMaxInclusive': False, 'isMinInclusive': True,
'max': '1FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE',
'min': '0FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF'}},
max_item_count=1,
continuation='-RID:~a0NPAOszCpOChB4AAAAAAA==#RT:1#TRC:2#ISV:2#IEO:65567#QCF:8').by_page()
for page in query_by_page:
items = list(page)
assert len(items) > 0
def test_query_with_continuation(self):
container = get_container(SINGLE_PARTITION_CONTAINER_ID)
query = 'SELECT * from c'
# go through all feed ranges using pagination
feed_ranges = container.read_feed_ranges()
for feed in feed_ranges:
query_kwargs = {
"query": query,
"feed_range": feed,
"priority": "Low",
"max_item_count": 1
}
query_results = container.query_items(**query_kwargs)
pager = query_results.by_page()
first_page = next(pager)
items = list(first_page)
assert len(items) > 0
continuation_token = pager.continuation_token
# use that continuation token to restart the query, and drain it from there
query_kwargs = {
"query": query,
"feed_range": feed,
"continuation": continuation_token,
"priority": "Low",
"max_item_count": 2
}
query_results = container.query_items(**query_kwargs)
pager = query_results.by_page(continuation_token=continuation_token)
for new_page in pager:
items = list(new_page)
assert len(items) > 0
if __name__ == "__main__":
unittest.main()
|