1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.
import unittest
import uuid
import pytest
import test_config
from azure.core import MatchConditions
from azure.cosmos import PartitionKey
from azure.cosmos.aio import CosmosClient, DatabaseProxy
from azure.cosmos.exceptions import CosmosHttpResponseError
@pytest.mark.cosmosEmulator
class TestBackwardsCompatibilityAsync(unittest.IsolatedAsyncioTestCase):
host = test_config.TestConfig.host
masterKey = test_config.TestConfig.masterKey
connectionPolicy = test_config.TestConfig.connectionPolicy
client: CosmosClient = None
created_database: DatabaseProxy = None
TEST_DATABASE_ID = test_config.TestConfig.TEST_DATABASE_ID
@classmethod
def setUpClass(cls):
if (cls.masterKey == '[YOUR_KEY_HERE]' or
cls.host == '[YOUR_ENDPOINT_HERE]'):
raise Exception(
"You must specify your Azure Cosmos account values for "
"'masterKey' and 'host' at the top of this class to run the "
"tests.")
async def asyncSetUp(self):
self.client = CosmosClient(self.host, self.masterKey)
self.created_database = self.client.get_database_client(self.TEST_DATABASE_ID)
async def asyncTearDown(self):
await self.client.close()
async def test_session_token_compatibility_async(self):
# Verifying that behavior is unaffected across the board for using `session_token` on irrelevant methods
# Database
database = await self.client.create_database(str(uuid.uuid4()), session_token=str(uuid.uuid4()))
assert database is not None
database2 = await self.client.create_database_if_not_exists(str(uuid.uuid4()), session_token=str(uuid.uuid4()))
assert database2 is not None
database_list = [db async for db in self.client.list_databases(session_token=str(uuid.uuid4()))]
database_list2 = [db async for db in self.client.query_databases(query="select * from c", session_token=str(uuid.uuid4()))]
assert len(database_list) > 0
assert len(database_list2) > 0
database_read = await database.read(session_token=str(uuid.uuid4()))
assert database_read is not None
await self.client.delete_database(database2.id, session_token=str(uuid.uuid4()))
try:
await database2.read()
pytest.fail("Database read should have failed")
except CosmosHttpResponseError as e:
assert e.status_code == 404
# Container
container = await self.created_database.create_container(str(uuid.uuid4()), PartitionKey(path="/pk"), session_token=str(uuid.uuid4()))
assert container is not None
container2 = await self.created_database.create_container_if_not_exists(str(uuid.uuid4()), PartitionKey(path="/pk"), session_token=str(uuid.uuid4()))
assert container2 is not None
container_list = [cont async for cont in self.created_database.list_containers(session_token=str(uuid.uuid4()))]
container_list2 = [cont async for cont in self.created_database.query_containers(query="select * from c", session_token=str(uuid.uuid4()))]
assert len(container_list) > 0
assert len(container_list2) > 0
container2_read = await container2.read(session_token=str(uuid.uuid4()))
assert container2_read is not None
replace_container = await self.created_database.replace_container(container2, PartitionKey(path="/pk"), default_ttl=30, session_token=str(uuid.uuid4()))
replace_container_read = await replace_container.read()
assert replace_container is not None
assert replace_container_read != container2_read
assert 'defaultTtl' in replace_container_read # Check for default_ttl as a new additional property
assert replace_container_read['defaultTtl'] == 30
await self.created_database.delete_container(replace_container.id, session_token=str(uuid.uuid4()))
try:
await container2.read()
pytest.fail("Container read should have failed")
except CosmosHttpResponseError as e:
assert e.status_code == 404
await self.client.delete_database(database.id)
async def test_etag_match_condition_compatibility_async(self):
# Verifying that behavior is unaffected across the board for using `etag`/`match_condition` on irrelevant methods
# Database
database = await self.client.create_database(str(uuid.uuid4()), etag=str(uuid.uuid4()), match_condition=MatchConditions.IfModified)
assert database is not None
database2 = await self.client.create_database_if_not_exists(str(uuid.uuid4()), etag=str(uuid.uuid4()), match_condition=MatchConditions.IfNotModified)
assert database2 is not None
await self.client.delete_database(database2.id, etag=str(uuid.uuid4()), match_condition=MatchConditions.IfModified)
try:
await database2.read()
pytest.fail("Database read should have failed")
except CosmosHttpResponseError as e:
assert e.status_code == 404
# Container
container = await self.created_database.create_container(str(uuid.uuid4()), PartitionKey(path="/pk"),
etag=str(uuid.uuid4()), match_condition=MatchConditions.IfModified)
assert container is not None
container2 = await self.created_database.create_container_if_not_exists(str(uuid.uuid4()), PartitionKey(path="/pk"),
etag=str(uuid.uuid4()), match_condition=MatchConditions.IfNotModified)
assert container2 is not None
container2_read = await container2.read()
assert container2_read is not None
replace_container = await self.created_database.replace_container(container2, PartitionKey(path="/pk"), default_ttl=30,
etag=str(uuid.uuid4()), match_condition=MatchConditions.IfModified)
replace_container_read = await replace_container.read()
assert replace_container is not None
assert replace_container_read != container2_read
assert 'defaultTtl' in replace_container_read # Check for default_ttl as a new additional property
await self.created_database.delete_container(replace_container.id, etag=str(uuid.uuid4()), match_condition=MatchConditions.IfModified)
try:
await container2.read()
pytest.fail("Container read should have failed")
except CosmosHttpResponseError as e:
assert e.status_code == 404
# Item
item = await container.create_item({"id": str(uuid.uuid4()), "pk": 0}, etag=str(uuid.uuid4()), match_condition=MatchConditions.IfModified)
assert item is not None
item2 = await container.upsert_item({"id": str(uuid.uuid4()), "pk": 0}, etag=str(uuid.uuid4()),
match_condition=MatchConditions.IfNotModified)
assert item2 is not None
item = await container.create_item({"id": str(uuid.uuid4()), "pk": 0}, etag=None, match_condition=None)
assert item is not None
item2 = await container.upsert_item({"id": str(uuid.uuid4()), "pk": 0}, etag=None,
match_condition=None)
assert item2 is not None
batch_operations = [
("create", ({"id": str(uuid.uuid4()), "pk": 0},)),
("replace", (item2['id'], {"id": str(uuid.uuid4()), "pk": 0})),
("read", (item['id'],)),
("upsert", ({"id": str(uuid.uuid4()), "pk": 0},)),
]
batch_results = await container.execute_item_batch(batch_operations, partition_key=0, etag=str(uuid.uuid4()), match_condition=MatchConditions.IfModified)
assert len(batch_results) == 4
for result in batch_results:
assert result['statusCode'] in (200, 201)
await self.client.delete_database(database.id)
if __name__ == '__main__':
unittest.main()
|