1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
|
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE.txt in the project root for
# license information.
# -------------------------------------------------------------------------
from azure.cosmos.aio import CosmosClient
import azure.cosmos.exceptions as exceptions
from azure.cosmos import ThroughputProperties
import asyncio
import config
# ----------------------------------------------------------------------------------------------------------
# Prerequisites -
#
# 1. An Azure Cosmos account -
# https://learn.microsoft.com/azure/cosmos-db/create-sql-api-python#create-a-database-account
#
# 2. Microsoft Azure Cosmos PyPi package -
# https://pypi.python.org/pypi/azure-cosmos/
# ----------------------------------------------------------------------------------------------------------
# Sample - demonstrates the basic CRUD operations on a Database resource for Azure Cosmos
#
# 1. Query for Database (QueryDatabases)
#
# 2. Create Database (CreateDatabase)
#
# 3. Get a Database by its Id property (ReadDatabase)
#
# 4. List all Database resources on an account (ReadDatabases)
#
# 5. Delete a Database given its Id property (DeleteDatabase)
# ----------------------------------------------------------------------------------------------------------
HOST = config.settings['host']
MASTER_KEY = config.settings['master_key']
DATABASE_ID = config.settings['database_id']
async def find_database(client, id):
print('1. Query for Database')
# Because the asynchronous client returns an asynchronous iterator object for methods that use
# return several databases using queries, we do not need to await the function. However, attempting
# to cast this object into a list directly will throw an error; instead, iterate over the databases
# to populate your list using an async for loop like shown here or in the list_databases() method
query_databases_response = client.query_databases(query={
"query": "SELECT * FROM r WHERE r.id=@id",
"parameters": [
{ "name":"@id", "value": id }
]
})
databases = [database async for database in query_databases_response]
if len(databases) > 0:
print('Database with id \'{0}\' was found'.format(id))
else:
print('No database with id \'{0}\' was found'. format(id))
# Alternatively, you can directly iterate over the asynchronous iterator without building a separate
# list if you don't need the ordering or indexing capabilities
async for database in query_databases_response:
print(database['id'])
async def create_database(client, id):
print("\n2. Create Database")
try:
await client.create_database(id=id)
print('Database with id \'{0}\' created'.format(id))
except exceptions.CosmosResourceExistsError:
print('A database with id \'{0}\' already exists'.format(id))
print("\n2.8 Create Database - With autoscale settings")
try:
await client.create_database(
id=id,
offer_throughput=ThroughputProperties(auto_scale_max_throughput=5000, auto_scale_increment_percent=0))
print('Database with id \'{0}\' created'.format(id))
except exceptions.CosmosResourceExistsError:
print('A database with id \'{0}\' already exists'.format(id))
# Alternatively, you can also use the create_database_if_not_exists method to avoid using a try catch
# This method attempts to read the database first, and based on the result either creates or returns
# the existing database. Due to the additional overhead from attempting a read, it is recommended
# to use the create_database() method if you know the database doesn't already exist.
await client.create_database_if_not_exists(id=id)
async def read_database(client, id):
print("\n3. Get a Database by id")
try:
database = client.get_database_client(id)
await database.read()
print('Database with id \'{0}\' was found, it\'s link is {1}'.format(id, database.database_link))
except exceptions.CosmosResourceNotFoundError:
print('A database with id \'{0}\' does not exist'.format(id))
async def list_databases(client):
print("\n4. List all Databases on an account")
print('Databases:')
# Because the asynchronous client returns an asynchronous iterator object for methods that use
# return several databases using queries, we do not need to await the function. However, attempting
# to cast this object into a list directly will throw an error; instead, iterate over the databases
# to populate your list using an async for loop like shown here or in the find_database() method
list_databases_response = client.list_databases()
databases = [database async for database in list_databases_response]
if len(databases) == 0:
return
for database in databases:
print(database['id'])
# Alternatively, you can directly iterate over the asynchronous iterator without building a separate
# list if you don't need the ordering or indexing capabilities
async for database in list_databases_response:
print(database['id'])
async def delete_database(client, id):
print("\n5. Delete Database")
try:
await client.delete_database(id)
print('Database with id \'{0}\' was deleted'.format(id))
except exceptions.CosmosResourceNotFoundError:
print('A database with id \'{0}\' does not exist'.format(id))
async def run_sample():
async with CosmosClient(HOST, {'masterKey': MASTER_KEY}) as client:
try:
# query for a database
await find_database(client, DATABASE_ID)
# create a database
await create_database(client, DATABASE_ID)
# get a database using its id
await read_database(client, DATABASE_ID)
# list all databases on an account
await list_databases(client)
# delete database by id
await delete_database(client, DATABASE_ID)
except exceptions.CosmosHttpResponseError as e:
print('\nrun_sample has caught an error. {0}'.format(e.message))
finally:
print("\nrun_sample done")
if __name__ == '__main__':
asyncio.run(run_sample())
|