File: read_items_sample_async.py

package info (click to toggle)
python-azure 20251104%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 770,224 kB
  • sloc: python: 6,357,217; ansic: 804; javascript: 287; makefile: 198; sh: 193; xml: 109
file content (125 lines) | stat: -rw-r--r-- 5,499 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE.txt in the project root for
# license information.
# -------------------------------------------------------------------------
import asyncio
import uuid
import config
from azure.cosmos.aio import CosmosClient, ContainerProxy
import azure.cosmos.exceptions as exceptions
from azure.cosmos.partition_key import PartitionKey

# ----------------------------------------------------------------------------------------------------------
# Prerequisites -
#
# 1. An Azure Cosmos account -
#    https://learn.microsoft.com/azure/cosmos-db/nosql/quickstart-portal#create-account
#
# 2. Microsoft Azure Cosmos PyPi package -
#    https://pypi.python.org/pypi/azure-cosmos/
# ----------------------------------------------------------------------------------------------------------
# Sample - demonstrates the asynchronous read_items API for Azure Cosmos DB
# ----------------------------------------------------------------------------------------------------------

HOST = config.settings['host']
MASTER_KEY = config.settings['master_key']
DATABASE_ID = "read_items_async_db"
CONTAINER_ID = "read_items_async_container"


async def create_items(container: ContainerProxy, num_items: int) -> list:
    """Helper function to create items in the container and return a list for read_items."""
    print(f"Creating {num_items} items...")
    items_to_read = []
    for i in range(num_items):
        doc_id = f"item_{i}_{uuid.uuid4()}"
        # For this sample, the partition key is the same as the item id
        pk_value = doc_id
        item_body = {'id': doc_id, 'data': i}
        await container.create_item(body=item_body)
        items_to_read.append((doc_id, pk_value))
    print(f"{num_items} items created.")
    return items_to_read


async def demonstrate_read_items(container: ContainerProxy) -> None:
    """Demonstrates various scenarios for the read_items API."""
    print("\n--- 1. Basic read_items usage with a non-existent item ---")
    items_to_read = await create_items(container, 5)
    # Add a non-existent item to the list to show it's ignored in the result
    items_to_read.append(("non_existent_item", "non_existent_pk"))

    read_results = await container.read_items(items=items_to_read)
    print(f"Successfully read {len(read_results)} items out of {len(items_to_read)} requested.")
    for item in read_results:
        print(f"  - Read item with id: {item.get('id')}")

    print("\n--- 2. Reading a large number of items to show concurrency ---")
    # This demonstrates how read_items handles concurrency and query chunking.
    # The SDK will split the 1500 items into multiple backend queries.
    large_items_list = await create_items(container, 1100)
    large_read_results = await container.read_items(items=large_items_list)
    print(f"Successfully read {len(large_read_results)} items.")
    headers = large_read_results.get_response_headers()
    if headers:
        print(f"Aggregated request charge for large read: {headers.get('x-ms-request-charge')}")

    print("\n--- 3. Using a response_hook to capture results and headers ---")
    hook_captured_data = {}

    def response_hook(headers, results):
        """A simple hook to capture the aggregated headers and the final result list."""
        print("Response hook called!")
        hook_captured_data['headers'] = headers
        hook_captured_data['results'] = results
        hook_captured_data['call_count'] = hook_captured_data.get('call_count', 0) + 1

    items_for_hook = await create_items(container, 10)
    hook_results = await container.read_items(
        items=items_for_hook,
        response_hook=response_hook
    )

    print(f"Response hook was called {hook_captured_data.get('call_count', 0)} time(s).")
    if 'headers' in hook_captured_data:
        print(f"Aggregated request charge from hook: {hook_captured_data['headers'].get('x-ms-request-charge')}")
    print(f"Result list from hook is the same as returned list: {hook_captured_data['results'] is hook_results}")


async def run_sample():
    """An asynchronous sample for the read_items API."""
    client = CosmosClient(HOST, {'masterKey': MASTER_KEY})
    db = None
    try:
        # Create a database
        db = await client.create_database_if_not_exists(id=DATABASE_ID)
        print(f"Database '{DATABASE_ID}' created or already exists.")

        # Create a container with /id as the partition key
        partition_key = PartitionKey(path="/id")
        container = await db.create_container_if_not_exists(
            id=CONTAINER_ID,
            partition_key=partition_key
        )
        print(f"Container '{CONTAINER_ID}' created or already exists.")

        await demonstrate_read_items(container)

    except exceptions.CosmosHttpResponseError as e:
        print(f"\nAn HTTP error occurred: {e.message}")
    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
    finally:
        if db:
            print("\n--- Cleaning up ---")
            try:
                await client.delete_database(db)
                print(f"Database '{DATABASE_ID}' cleaned up.")
            except exceptions.CosmosResourceNotFoundError:
                print(f"Database '{DATABASE_ID}' was not found, cleanup not needed.")
        await client.close()


if __name__ == '__main__':
    asyncio.run(run_sample())