File: access_cosmos_with_aad_async.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (115 lines) | stat: -rw-r--r-- 5,235 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE.txt in the project root for
# license information.
# -------------------------------------------------------------------------
from azure.cosmos.aio import CosmosClient
import azure.cosmos.exceptions as exceptions
from azure.cosmos.partition_key import PartitionKey
from azure.identity.aio import ClientSecretCredential, DefaultAzureCredential
import config
import asyncio

# ----------------------------------------------------------------------------------------------------------
# Prerequisites -
#
# 1. An Azure Cosmos account -
#    https://learn.microsoft.com/azure/cosmos-db/create-sql-api-python#create-a-database-account
#
# 2. Microsoft Azure Cosmos
#    pip install azure-cosmos>=4.3.0b4
# ----------------------------------------------------------------------------------------------------------
# Sample - demonstrates how to authenticate and use your database account using AAD credentials
# Read more about operations allowed for this authorization method: https://aka.ms/cosmos-native-rbac
# ----------------------------------------------------------------------------------------------------------
# Note:
# This sample creates a Container to your database account.
# Each time a Container is created the account will be billed for 1 hour of usage based on
# the provisioned throughput (RU/s) of that account.
# ----------------------------------------------------------------------------------------------------------
# <configureConnectivity>
HOST = config.settings["host"]
MASTER_KEY = config.settings["master_key"]

TENANT_ID = config.settings["tenant_id"]
CLIENT_ID = config.settings["client_id"]
CLIENT_SECRET = config.settings["client_secret"]

DATABASE_ID = config.settings["database_id"]
CONTAINER_ID = config.settings["container_id"]
PARTITION_KEY = PartitionKey(path="/id")


def get_test_item(num):
    test_item = {
        'id': 'Item_' + str(num),
        'test_object': True,
        'lastName': 'Smith'
    }
    return test_item


async def create_sample_resources():
    print("creating sample resources")
    async with CosmosClient(HOST, MASTER_KEY) as client:
        db = await client.create_database(DATABASE_ID)
        await db.create_container(id=CONTAINER_ID, partition_key=PARTITION_KEY)


async def delete_sample_resources():
    print("deleting sample resources")
    async with CosmosClient(HOST, MASTER_KEY) as client:
        await client.delete_database(DATABASE_ID)


async def run_sample():
    # Since Azure Cosmos DB data plane SDK does not cover management operations, we have to create our resources
    # with a master key authenticated client for this sample.
    await create_sample_resources()

    # With this done, you can use your AAD service principal id and secret to create your ClientSecretCredential.
    # The async ClientSecretCredentials, like the async client, also have a context manager,
    # and as such should be used with the `async with` keywords.
    async with ClientSecretCredential(
            tenant_id=TENANT_ID,
            client_id=CLIENT_ID,
            client_secret=CLIENT_SECRET) as aad_credentials:

        # Use your credentials to authenticate your client.
        async with CosmosClient(HOST, aad_credentials) as aad_client:
            print("Showed ClientSecretCredential, now showing DefaultAzureCredential")

    # You can also utilize DefaultAzureCredential rather than directly passing in the id's and secrets.
    # This is the recommended method of authentication, and uses environment variables rather than in-code strings.
    async with DefaultAzureCredential() as aad_credentials:

        # Use your credentials to authenticate your client.
        async with CosmosClient(HOST, aad_credentials) as aad_client:

            # Do any R/W data operations with your authorized AAD client.
            db = aad_client.get_database_client(DATABASE_ID)
            container = db.get_container_client(CONTAINER_ID)

            print("Container info: " + str(container.read()))
            await container.create_item(get_test_item(879))
            print("Point read result: " + str(container.read_item(item='Item_0', partition_key='Item_0')))
            query_results = [item async for item in
                             container.query_items(query='select * from c', partition_key='Item_0')]
            assert len(query_results) == 1
            print("Query result: " + str(query_results[0]))
            await container.delete_item(item='Item_0', partition_key='Item_0')

            # Attempting to do management operations will return a 403 Forbidden exception.
            try:
                await aad_client.delete_database(DATABASE_ID)
            except exceptions.CosmosHttpResponseError as e:
                assert e.status_code == 403
                print("403 error assertion success")

    # To clean up the sample, we use a master key client again to get access to deleting containers/ databases.
    await delete_sample_resources()
    print("end of sample")


if __name__ == "__main__":
    asyncio.run(run_sample())