1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.
import unittest
import uuid
import pytest
import test_config
from azure.cosmos import CosmosClient, PartitionKey, ContainerProxy, DatabaseProxy
from azure.cosmos.http_constants import HttpHeaders
# TODO: add more tests in this file once we have response headers in control plane operations
# TODO: add query tests once those changes are available
@pytest.mark.cosmosEmulator
class TestCosmosResponses(unittest.TestCase):
"""Python Cosmos Responses Tests.
"""
configs = test_config.TestConfig
host = configs.host
masterKey = configs.masterKey
client: CosmosClient = None
test_database: DatabaseProxy = None
TEST_DATABASE_ID = configs.TEST_DATABASE_ID
@classmethod
def setUpClass(cls):
if (cls.masterKey == '[YOUR_KEY_HERE]' or
cls.host == '[YOUR_ENDPOINT_HERE]'):
raise Exception(
"You must specify your Azure Cosmos account values for "
"'masterKey' and 'host' at the top of this class to run the "
"tests.")
cls.client = CosmosClient(cls.host, cls.masterKey)
cls.test_database = cls.client.get_database_client(cls.TEST_DATABASE_ID)
def test_point_operation_headers(self):
container = self.test_database.create_container(id="responses_test" + str(uuid.uuid4()),
partition_key=PartitionKey(path="/company"))
first_response = container.upsert_item({"id": str(uuid.uuid4()), "company": "Microsoft"})
lsn = first_response.get_response_headers()['lsn']
create_response = container.create_item({"id": str(uuid.uuid4()), "company": "Microsoft"})
assert len(create_response.get_response_headers()) > 0
assert int(lsn) + 1 == int(create_response.get_response_headers()['lsn'])
lsn = create_response.get_response_headers()['lsn']
read_response = container.read_item(create_response['id'], create_response['company'])
assert len(read_response.get_response_headers()) > 0
upsert_response = container.upsert_item({"id": str(uuid.uuid4()), "company": "Microsoft"})
assert len(upsert_response.get_response_headers()) > 0
assert int(lsn) + 1 == int(upsert_response.get_response_headers()['lsn'])
lsn = upsert_response.get_response_headers()['lsn']
upsert_response['replace'] = True
replace_response = container.replace_item(upsert_response['id'], upsert_response)
assert replace_response['replace'] is not None
assert len(replace_response.get_response_headers()) > 0
assert int(lsn) + 1 == int(replace_response.get_response_headers()['lsn'])
batch = []
for i in range(50):
batch.append(("create", ({"id": "item" + str(i), "company": "Microsoft"},)))
batch_response = container.execute_item_batch(batch_operations=batch, partition_key="Microsoft")
assert len(batch_response.get_response_headers()) > 0
assert int(lsn) + 1 < int(batch_response.get_response_headers()['lsn'])
def test_create_database_headers(self):
first_response = self.client.create_database(id="responses_test" + str(uuid.uuid4()), return_properties=True)
assert len(first_response[1].get_response_headers()) > 0
def test_create_database_returns_database_proxy(self):
first_response = self.client.create_database(id="responses_test" + str(uuid.uuid4()))
assert isinstance(first_response, DatabaseProxy)
def test_create_database_if_not_exists_headers(self):
first_response = self.client.create_database_if_not_exists(id="responses_test" + str(uuid.uuid4()), return_properties=True)
assert len(first_response[1].get_response_headers()) > 0
def test_create_database_if_not_exists_headers_negative(self):
first_response = self.client.create_database_if_not_exists(id="responses_test", return_properties=True)
second_response = self.client.create_database_if_not_exists(id="responses_test", return_properties=True)
assert len(second_response[1].get_response_headers()) > 0
def test_create_container_headers(self):
first_response = self.test_database.create_container(id="responses_test" + str(uuid.uuid4()),
partition_key=PartitionKey(path="/company"), return_properties=True)
assert len(first_response[1].get_response_headers()) > 0
def test_create_container_returns_container_proxy(self):
first_response = self.test_database.create_container(id="responses_test" + str(uuid.uuid4()),
partition_key=PartitionKey(path="/company"))
assert isinstance(first_response, ContainerProxy)
def test_create_container_if_not_exists_headers(self):
first_response = self.test_database.create_container_if_not_exists(id="responses_test" + str(uuid.uuid4()),
partition_key=PartitionKey(path="/company"), return_properties=True)
assert len(first_response[1].get_response_headers()) > 0
def test_create_container_if_not_exists_headers_negative(self):
first_response = self.test_database.create_container_if_not_exists(id="responses_test",
partition_key=PartitionKey(path="/company"), return_properties=True)
second_response = self.test_database.create_container_if_not_exists(id="responses_test",
partition_key=PartitionKey(path="/company"), return_properties=True)
assert len(second_response[1].get_response_headers()) > 0
def test_replace_container_headers(self):
first_response = self.test_database.create_container_if_not_exists(id="responses_test" + str(uuid.uuid4()),
partition_key=PartitionKey(path="/company"))
second_response = self.test_database.replace_container(first_response.id,
partition_key=PartitionKey(path="/company"), return_properties=True)
assert len(second_response[1].get_response_headers()) > 0
def test_database_read_headers(self):
db = self.client.create_database(id="responses_test" + str(uuid.uuid4()))
first_response = db.read()
assert len(first_response.get_response_headers()) > 0
def test_container_read_headers(self):
container = self.test_database.create_container(id="responses_test" + str(uuid.uuid4()),
partition_key=PartitionKey(path="/company"))
first_response = container.read()
assert len(first_response.get_response_headers()) > 0
def test_container_replace_throughput(self):
container = self.test_database.create_container(id="responses_test" + str(uuid.uuid4()),
partition_key=PartitionKey(path="/company"), offer_throughput=400)
replace_throughput_value = 500
first_response = container.replace_throughput(replace_throughput_value)
assert len(first_response.get_response_headers()) > 0
assert replace_throughput_value == container.get_throughput().offer_throughput
def test_database_replace_throughput(self):
db = self.client.create_database(id="responses_test" + str(uuid.uuid4()), offer_throughput=400)
replace_throughput_value = 500
first_response = db.replace_throughput(replace_throughput_value)
assert len(first_response.get_response_headers()) > 0
assert replace_throughput_value == db.get_throughput().offer_throughput
if __name__ == '__main__':
unittest.main()
|