File: test_read_items_partition_split_async.py

package info (click to toggle)
python-azure 20251014%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 766,472 kB
  • sloc: python: 6,314,744; ansic: 804; javascript: 287; makefile: 198; sh: 198; xml: 109
file content (67 lines) | stat: -rw-r--r-- 2,627 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.
import unittest
import uuid
import pytest
from azure.cosmos.aio import CosmosClient, DatabaseProxy
import test_config
from azure.cosmos import PartitionKey


@pytest.mark.cosmosSplit
class TestReadItemsPartitionSplitScenarios(unittest.IsolatedAsyncioTestCase):
    """Tests the behavior of read_items in scenarios involving partition splits."""

    created_db: DatabaseProxy = None
    client: CosmosClient = None
    host = test_config.TestConfig.host
    masterKey = test_config.TestConfig.masterKey
    configs = test_config.TestConfig
    TEST_DATABASE_ID = configs.TEST_DATABASE_ID

    async def asyncSetUp(self):
        self.client = CosmosClient(self.host, self.masterKey)
        self.database = self.client.get_database_client(self.TEST_DATABASE_ID)

    async def asyncTearDown(self):
        await self.client.close()

    async def test_read_items_with_partition_split_async(self):
        """Tests that read_items works correctly after a partition split."""
        container = await self.database.create_container(
            "read_items_split_test_async" + str(uuid.uuid4()),
            PartitionKey(path="/pk"),
            offer_throughput=400)
        # 1. Create 5 items to read
        items_to_read = []
        item_ids = []
        for i in range(5):
            doc_id = f"item_split_{i}_{uuid.uuid4()}"
            item_ids.append(doc_id)
            # Add the partition key field 'pk' to the item body
            await container.create_item({'id': doc_id, 'pk': doc_id, 'data': i})
            items_to_read.append((doc_id, doc_id))

        # 2. Initial read_items call before the split
        print("Performing initial read_items call...")
        initial_read_items = await container.read_items(items=items_to_read)
        self.assertEqual(len(initial_read_items), len(items_to_read))
        print("Initial call successful.")

        # 3. Trigger a partition split
        await test_config.TestConfig.trigger_split_async(container, 11000)

        # 4. Call read_items again after the split
        print("Performing post-split read_items call...")
        final_read_items = await container.read_items(items=items_to_read)

        # 5. Verify the results
        self.assertEqual(len(final_read_items), len(items_to_read))
        final_read_ids = {item['id'] for item in final_read_items}
        self.assertSetEqual(final_read_ids, set(item_ids))
        print("Post-split call successful.")
        await self.database.delete_container(container.id)


if __name__ == '__main__':
    unittest.main()