File: test_cosmos_responses_async.py

package info (click to toggle)
python-azure 20251118%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 783,356 kB
  • sloc: python: 6,474,533; ansic: 804; javascript: 287; sh: 205; makefile: 198; xml: 109
file content (153 lines) | stat: -rw-r--r-- 8,448 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.

import unittest
import uuid

import pytest

import test_config
from azure.cosmos import PartitionKey
from azure.cosmos.aio import CosmosClient, ContainerProxy, DatabaseProxy
from azure.cosmos.http_constants import HttpHeaders


# TODO: add more tests in this file once we have response headers in control plane operations
# TODO: add query tests once those changes are available

@pytest.mark.cosmosEmulator
class TestCosmosResponsesAsync(unittest.IsolatedAsyncioTestCase):
    """Python Cosmos Responses Tests.
    """

    configs = test_config.TestConfig
    host = configs.host
    masterKey = configs.masterKey
    TEST_DATABASE_ID = configs.TEST_DATABASE_ID

    @classmethod
    def setUpClass(cls):
        if (cls.masterKey == '[YOUR_KEY_HERE]' or
                cls.host == '[YOUR_ENDPOINT_HERE]'):
            raise Exception(
                "You must specify your Azure Cosmos account values for "
                "'masterKey' and 'host' at the top of this class to run the "
                "tests.")

    async def asyncSetUp(self):
        self.client = CosmosClient(self.host, self.masterKey)
        self.test_database = self.client.get_database_client(self.TEST_DATABASE_ID)

    async def asyncTearDown(self):
        await self.client.close()

    async def test_point_operation_headers_async(self):
        container = await self.test_database.create_container(id="responses_test" + str(uuid.uuid4()),
                                                              partition_key=PartitionKey(path="/company"))
        first_response = await container.upsert_item({"id": str(uuid.uuid4()), "company": "Microsoft"})
        lsn = first_response.get_response_headers()['lsn']

        create_response = await container.create_item({"id": str(uuid.uuid4()), "company": "Microsoft"})
        assert len(create_response.get_response_headers()) > 0
        assert int(lsn) + 1 == int(create_response.get_response_headers()['lsn'])
        lsn = create_response.get_response_headers()['lsn']

        read_response = await container.read_item(create_response['id'], create_response['company'])
        assert len(read_response.get_response_headers()) > 0

        upsert_response = await container.upsert_item({"id": str(uuid.uuid4()), "company": "Microsoft"})
        assert len(upsert_response.get_response_headers()) > 0
        assert int(lsn) + 1 == int(upsert_response.get_response_headers()['lsn'])
        lsn = upsert_response.get_response_headers()['lsn']

        upsert_response['replace'] = True
        replace_response = await container.replace_item(upsert_response['id'], upsert_response)
        assert replace_response['replace'] is not None
        assert len(replace_response.get_response_headers()) > 0
        assert int(lsn) + 1 == int(replace_response.get_response_headers()['lsn'])

        batch = []
        for i in range(50):
            batch.append(("create", ({"id": "item" + str(i), "company": "Microsoft"},)))
        batch_response = await container.execute_item_batch(batch_operations=batch, partition_key="Microsoft")
        assert len(batch_response.get_response_headers()) > 0
        assert int(lsn) + 1 < int(batch_response.get_response_headers()['lsn'])

    async def test_create_database_headers_async(self):
        first_response = await self.client.create_database(id="responses_test" + str(uuid.uuid4()), return_properties=True)

        assert len(first_response[1].get_response_headers()) > 0

    async def test_create_database_returns_database_proxy_async(self):
        first_response = await self.client.create_database(id="responses_test" + str(uuid.uuid4()))
        assert isinstance(first_response, DatabaseProxy)

    async def test_create_database_if_not_exists_headers_async(self):
        first_response = await self.client.create_database_if_not_exists(id="responses_test" + str(uuid.uuid4()), return_properties=True)
        assert len(first_response[1].get_response_headers()) > 0

    async def test_create_database_if_not_exists_headers_negative_async(self):
        first_response = await self.client.create_database_if_not_exists(id="responses_test", return_properties=True)
        second_response = await self.client.create_database_if_not_exists(id="responses_test", return_properties=True)
        assert len(second_response[1].get_response_headers()) > 0

    async def test_create_container_headers_async(self):
        first_response = await self.test_database.create_container(id="responses_test" + str(uuid.uuid4()),
                                                                   partition_key=PartitionKey(path="/company"), return_properties=True)
        assert len(first_response[1].get_response_headers()) > 0

    async def test_create_container_returns_container_proxy_async(self):
        first_response = await self.test_database.create_container(id="responses_test" + str(uuid.uuid4()),
                                                        partition_key=PartitionKey(path="/company"))
        assert isinstance(first_response, ContainerProxy)

    async def test_create_container_if_not_exists_headers_async(self):
        first_response = await self.test_database.create_container_if_not_exists(id="responses_test" + str(uuid.uuid4()),
                                                        partition_key=PartitionKey(path="/company"), return_properties=True)
        assert len(first_response[1].get_response_headers()) > 0

    async def test_create_container_if_not_exists_headers_negative_async(self):
        first_response = await self.test_database.create_container_if_not_exists(id="responses_test1",
                                                        partition_key=PartitionKey(path="/company"), return_properties=True)
        second_response = await self.test_database.create_container_if_not_exists(id="responses_test1",
                                                        partition_key=PartitionKey(path="/company"), return_properties=True)
        assert len(second_response[1].get_response_headers()) > 0

    async def test_replace_container_headers_async(self):
        first_response = await self.test_database.create_container_if_not_exists(id="responses_test" + str(uuid.uuid4()),
                                                        partition_key=PartitionKey(path="/company"))
        second_response = await self.test_database.replace_container(first_response.id,
                                                               partition_key=PartitionKey(path="/company"), return_properties=True)
        assert len(second_response[1].get_response_headers()) > 0

    async def test_database_read_headers_async(self):
        db = await self.client.create_database(id="responses_test" + str(uuid.uuid4()))
        first_response = await db.read()
        assert len(first_response.get_response_headers()) > 0

    async def test_container_read_headers_async(self):
        container = await self.test_database.create_container(id="responses_test" + str(uuid.uuid4()),
                                                             partition_key=PartitionKey(path="/company"))
        first_response = await container.read()
        assert len(first_response.get_response_headers()) > 0

    async def test_container_replace_throughput_async(self):
        container = await self.test_database.create_container(id="responses_test" + str(uuid.uuid4()),
                                                        partition_key=PartitionKey(path="/company"), offer_throughput=400)
        replace_throughput_value = 500
        first_response = await container.replace_throughput(replace_throughput_value)
        assert len(first_response.get_response_headers()) > 0
        new_throughput = await container.get_throughput()
        assert replace_throughput_value == new_throughput.offer_throughput

    async def test_database_replace_throughput_async(self):
        db = await self.client.create_database(id="responses_test" + str(uuid.uuid4()), offer_throughput=400)
        replace_throughput_value = 500
        first_response = await db.replace_throughput(replace_throughput_value)
        assert len(first_response.get_response_headers()) > 0
        new_throughput = await db.get_throughput()
        assert replace_throughput_value == new_throughput.offer_throughput


if __name__ == '__main__':
    unittest.main()