1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
|
import azure.cosmos.documents as documents
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.errors as errors
import samples.Shared.config as cfg
# ----------------------------------------------------------------------------------------------------------
# Prerequistes -
#
# 1. An Azure Cosmos account -
# https://docs.microsoft.com/azure/cosmos-db/create-sql-api-python#create-a-database-account
#
# 2. Microsoft Azure Cosmos PyPi package -
# https://pypi.python.org/pypi/azure-cosmos/
# ----------------------------------------------------------------------------------------------------------
# Sample - demonstrates the basic CRUD operations on a Database resource for Azure Cosmos
#
# 1. Query for Database (QueryDatabases)
#
# 2. Create Database (CreateDatabase)
#
# 3. Get a Database by its Id property (ReadDatabase)
#
# 4. List all Database resources on an account (ReadDatabases)
#
# 5. Delete a Database given its Id property (DeleteDatabase)
# ----------------------------------------------------------------------------------------------------------
HOST = cfg.settings['host']
MASTER_KEY = cfg.settings['master_key']
DATABASE_ID = cfg.settings['database_id']
class IDisposable:
""" A context manager to automatically close an object with a close method
in a with statement. """
def __init__(self, obj):
self.obj = obj
def __enter__(self):
return self.obj # bound to target
def __exit__(self, exception_type, exception_val, trace):
# extra cleanup in here
self.obj = None
class DatabaseManagement:
@staticmethod
def find_database(client, id):
print('1. Query for Database')
databases = list(client.QueryDatabases({
"query": "SELECT * FROM r WHERE r.id=@id",
"parameters": [
{ "name":"@id", "value": id }
]
}))
if len(databases) > 0:
print('Database with id \'{0}\' was found'.format(id))
else:
print('No database with id \'{0}\' was found'. format(id))
@staticmethod
def create_database(client, id):
print("\n2. Create Database")
try:
client.CreateDatabase({"id": id})
print('Database with id \'{0}\' created'.format(id))
except errors.HTTPFailure as e:
if e.status_code == 409:
print('A database with id \'{0}\' already exists'.format(id))
else:
raise
@staticmethod
def read_database(client, id):
print("\n3. Get a Database by id")
try:
# All Azure Cosmos resources are addressable via a link
# This link is constructed from a combination of resource hierachy and
# the resource id.
# Eg. The link for database with an id of Foo would be dbs/Foo
database_link = 'dbs/' + id
database = client.ReadDatabase(database_link)
print('Database with id \'{0}\' was found, it\'s _self is {1}'.format(id, database['_self']))
except errors.HTTPFailure as e:
if e.status_code == 404:
print('A database with id \'{0}\' does not exist'.format(id))
else:
raise
@staticmethod
def list_databases(client):
print("\n4. List all Databases on an account")
print('Databases:')
databases = list(client.ReadDatabases())
if not databases:
return
for database in databases:
print(database['id'])
@staticmethod
def delete_database(client, id):
print("\n5. Delete Database")
try:
database_link = 'dbs/' + id
client.DeleteDatabase(database_link)
print('Database with id \'{0}\' was deleted'.format(id))
except errors.HTTPFailure as e:
if e.status_code == 404:
print('A database with id \'{0}\' does not exist'.format(id))
else:
raise
def run_sample():
with IDisposable(cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY} )) as client:
try:
# query for a database
DatabaseManagement.find_database(client, DATABASE_ID)
# create a database
DatabaseManagement.create_database(client, DATABASE_ID)
# get a database using its id
DatabaseManagement.read_database(client, DATABASE_ID)
# list all databases on an account
DatabaseManagement.list_databases(client)
# delete database by id
DatabaseManagement.delete_database(client, DATABASE_ID)
except errors.HTTPFailure as e:
print('\nrun_sample has caught an error. {0}'.format(e))
finally:
print("\nrun_sample done")
if __name__ == '__main__':
try:
run_sample()
except Exception as e:
print("Top level Error: args:{0}, message:{1}".format(e.args,e))
|