1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.
import unittest
import uuid
import pytest
import test_config
from azure.cosmos import PartitionKey
from azure.cosmos.aio import CosmosClient, ContainerProxy, DatabaseProxy
from azure.cosmos.http_constants import HttpHeaders
# TODO: add more tests in this file once we have response headers in control plane operations
# TODO: add query tests once those changes are available
@pytest.mark.cosmosEmulator
class TestCosmosResponsesAsync(unittest.IsolatedAsyncioTestCase):
"""Python Cosmos Responses Tests.
"""
configs = test_config.TestConfig
host = configs.host
masterKey = configs.masterKey
TEST_DATABASE_ID = configs.TEST_DATABASE_ID
@classmethod
def setUpClass(cls):
if (cls.masterKey == '[YOUR_KEY_HERE]' or
cls.host == '[YOUR_ENDPOINT_HERE]'):
raise Exception(
"You must specify your Azure Cosmos account values for "
"'masterKey' and 'host' at the top of this class to run the "
"tests.")
async def asyncSetUp(self):
self.client = CosmosClient(self.host, self.masterKey)
self.test_database = self.client.get_database_client(self.TEST_DATABASE_ID)
async def asyncTearDown(self):
await self.client.close()
async def test_point_operation_headers_async(self):
container = await self.test_database.create_container(id="responses_test" + str(uuid.uuid4()),
partition_key=PartitionKey(path="/company"))
first_response = await container.upsert_item({"id": str(uuid.uuid4()), "company": "Microsoft"})
lsn = first_response.get_response_headers()['lsn']
create_response = await container.create_item({"id": str(uuid.uuid4()), "company": "Microsoft"})
assert len(create_response.get_response_headers()) > 0
assert int(lsn) + 1 == int(create_response.get_response_headers()['lsn'])
lsn = create_response.get_response_headers()['lsn']
read_response = await container.read_item(create_response['id'], create_response['company'])
assert len(read_response.get_response_headers()) > 0
upsert_response = await container.upsert_item({"id": str(uuid.uuid4()), "company": "Microsoft"})
assert len(upsert_response.get_response_headers()) > 0
assert int(lsn) + 1 == int(upsert_response.get_response_headers()['lsn'])
lsn = upsert_response.get_response_headers()['lsn']
upsert_response['replace'] = True
replace_response = await container.replace_item(upsert_response['id'], upsert_response)
assert replace_response['replace'] is not None
assert len(replace_response.get_response_headers()) > 0
assert int(lsn) + 1 == int(replace_response.get_response_headers()['lsn'])
batch = []
for i in range(50):
batch.append(("create", ({"id": "item" + str(i), "company": "Microsoft"},)))
batch_response = await container.execute_item_batch(batch_operations=batch, partition_key="Microsoft")
assert len(batch_response.get_response_headers()) > 0
assert int(lsn) + 1 < int(batch_response.get_response_headers()['lsn'])
async def test_create_database_headers_async(self):
first_response = await self.client.create_database(id="responses_test" + str(uuid.uuid4()), return_properties=True)
assert len(first_response[1].get_response_headers()) > 0
async def test_create_database_returns_database_proxy_async(self):
first_response = await self.client.create_database(id="responses_test" + str(uuid.uuid4()))
assert isinstance(first_response, DatabaseProxy)
async def test_create_database_if_not_exists_headers_async(self):
first_response = await self.client.create_database_if_not_exists(id="responses_test" + str(uuid.uuid4()), return_properties=True)
assert len(first_response[1].get_response_headers()) > 0
async def test_create_database_if_not_exists_headers_negative_async(self):
first_response = await self.client.create_database_if_not_exists(id="responses_test", return_properties=True)
second_response = await self.client.create_database_if_not_exists(id="responses_test", return_properties=True)
assert len(second_response[1].get_response_headers()) > 0
async def test_create_container_headers_async(self):
first_response = await self.test_database.create_container(id="responses_test" + str(uuid.uuid4()),
partition_key=PartitionKey(path="/company"), return_properties=True)
assert len(first_response[1].get_response_headers()) > 0
async def test_create_container_returns_container_proxy_async(self):
first_response = await self.test_database.create_container(id="responses_test" + str(uuid.uuid4()),
partition_key=PartitionKey(path="/company"))
assert isinstance(first_response, ContainerProxy)
async def test_create_container_if_not_exists_headers_async(self):
first_response = await self.test_database.create_container_if_not_exists(id="responses_test" + str(uuid.uuid4()),
partition_key=PartitionKey(path="/company"), return_properties=True)
assert len(first_response[1].get_response_headers()) > 0
async def test_create_container_if_not_exists_headers_negative_async(self):
first_response = await self.test_database.create_container_if_not_exists(id="responses_test1",
partition_key=PartitionKey(path="/company"), return_properties=True)
second_response = await self.test_database.create_container_if_not_exists(id="responses_test1",
partition_key=PartitionKey(path="/company"), return_properties=True)
assert len(second_response[1].get_response_headers()) > 0
async def test_replace_container_headers_async(self):
first_response = await self.test_database.create_container_if_not_exists(id="responses_test" + str(uuid.uuid4()),
partition_key=PartitionKey(path="/company"))
second_response = await self.test_database.replace_container(first_response.id,
partition_key=PartitionKey(path="/company"), return_properties=True)
assert len(second_response[1].get_response_headers()) > 0
async def test_database_read_headers_async(self):
db = await self.client.create_database(id="responses_test" + str(uuid.uuid4()))
first_response = await db.read()
assert len(first_response.get_response_headers()) > 0
async def test_container_read_headers_async(self):
container = await self.test_database.create_container(id="responses_test" + str(uuid.uuid4()),
partition_key=PartitionKey(path="/company"))
first_response = await container.read()
assert len(first_response.get_response_headers()) > 0
async def test_container_replace_throughput_async(self):
container = await self.test_database.create_container(id="responses_test" + str(uuid.uuid4()),
partition_key=PartitionKey(path="/company"), offer_throughput=400)
replace_throughput_value = 500
first_response = await container.replace_throughput(replace_throughput_value)
assert len(first_response.get_response_headers()) > 0
new_throughput = await container.get_throughput()
assert replace_throughput_value == new_throughput.offer_throughput
async def test_database_replace_throughput_async(self):
db = await self.client.create_database(id="responses_test" + str(uuid.uuid4()), offer_throughput=400)
replace_throughput_value = 500
first_response = await db.replace_throughput(replace_throughput_value)
assert len(first_response.get_response_headers()) > 0
new_throughput = await db.get_throughput()
assert replace_throughput_value == new_throughput.offer_throughput
if __name__ == '__main__':
unittest.main()
|