1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
|
# -*- coding: utf-8 -*-
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.
"""End-to-end test.
"""
import json
import logging
import os.path
import time
import unittest
import urllib.parse as urllib
import uuid
import pytest
import requests
from azure.core import MatchConditions
from azure.core.exceptions import AzureError, ServiceResponseError
from azure.core.pipeline.transport import AsyncioRequestsTransport, AsyncioRequestsTransportResponse
import azure.cosmos._base as base
import azure.cosmos.documents as documents
import azure.cosmos.exceptions as exceptions
import test_config
from azure.cosmos.aio import CosmosClient, _retry_utility_async, DatabaseProxy
from azure.cosmos.http_constants import HttpHeaders, StatusCodes
from azure.cosmos.partition_key import PartitionKey
class TimeoutTransport(AsyncioRequestsTransport):
def __init__(self, response):
self._response = response
super(TimeoutTransport, self).__init__()
async def send(self, *args, **kwargs):
if kwargs.pop("passthrough", False):
return super(TimeoutTransport, self).send(*args, **kwargs)
time.sleep(5)
if isinstance(self._response, Exception):
raise self._response
current_response = await self._response
output = requests.Response()
output.status_code = current_response
response = AsyncioRequestsTransportResponse(None, output)
return response
@pytest.mark.cosmosLong
class TestCRUDDatabaseOperationsAsync(unittest.IsolatedAsyncioTestCase):
"""Python CRUD Tests.
"""
client: CosmosClient = None
configs = test_config.TestConfig
host = configs.host
masterKey = configs.masterKey
connectionPolicy = configs.connectionPolicy
last_headers = []
database_for_test: DatabaseProxy = None
async def __assert_http_failure_with_status(self, status_code, func, *args, **kwargs):
"""Assert HTTP failure with status.
:Parameters:
- `status_code`: int
- `func`: function
"""
try:
await func(*args, **kwargs)
self.fail('function should fail.')
except exceptions.CosmosHttpResponseError as inst:
assert inst.status_code == status_code
@classmethod
def setUpClass(cls):
if (cls.masterKey == '[YOUR_KEY_HERE]' or
cls.host == '[YOUR_ENDPOINT_HERE]'):
raise Exception(
"You must specify your Azure Cosmos account values for "
"'masterKey' and 'host' at the top of this class to run the "
"tests.")
async def asyncSetUp(self):
self.client = CosmosClient(self.host, self.masterKey)
self.database_for_test = self.client.get_database_client(self.configs.TEST_DATABASE_ID)
async def asyncTearDown(self):
await self.client.close()
async def test_database_crud_async(self):
database_id = str(uuid.uuid4())
created_db = await self.client.create_database(database_id)
assert created_db.id == database_id
# query databases.
databases = [database async for database in self.client.query_databases(
query='SELECT * FROM root r WHERE r.id=@id',
parameters=[
{'name': '@id', 'value': database_id}
]
)]
assert len(databases) > 0
# read database.
self.client.get_database_client(created_db.id)
await created_db.read()
# delete database.
await self.client.delete_database(created_db.id)
# read database after deletion
read_db = self.client.get_database_client(created_db.id)
await self.__assert_http_failure_with_status(StatusCodes.NOT_FOUND, read_db.read)
database_proxy = await self.client.create_database_if_not_exists(id=database_id, offer_throughput=5000)
assert database_id == database_proxy.id
db_throughput = await database_proxy.get_throughput()
assert 5000 == db_throughput.offer_throughput
database_proxy = await self.client.create_database_if_not_exists(id=database_id, offer_throughput=6000)
assert database_id == database_proxy.id
db_throughput = await database_proxy.get_throughput()
assert 5000 == db_throughput.offer_throughput
# delete database.
await self.client.delete_database(database_id)
async def test_database_level_offer_throughput_async(self):
# Create a database with throughput
offer_throughput = 1000
database_id = str(uuid.uuid4())
created_db = await self.client.create_database(
id=database_id,
offer_throughput=offer_throughput
)
assert created_db.id == database_id
# Verify offer throughput for database
offer = await created_db.get_throughput()
assert offer.offer_throughput == offer_throughput
# Update database offer throughput
new_offer_throughput = 2000
offer = await created_db.replace_throughput(new_offer_throughput)
assert offer.offer_throughput == new_offer_throughput
await self.client.delete_database(database_id)
async def test_sql_query_crud_async(self):
# create two databases.
db1 = await self.client.create_database('database 1' + str(uuid.uuid4()))
db2 = await self.client.create_database('database 2' + str(uuid.uuid4()))
# query with parameters.
databases = [database async for database in self.client.query_databases(
query='SELECT * FROM root r WHERE r.id=@id',
parameters=[
{'name': '@id', 'value': db1.id}
]
)]
assert 1 == len(databases)
# query without parameters.
databases = [database async for database in self.client.query_databases(
query='SELECT * FROM root r WHERE r.id="database non-existing"'
)]
assert 0 == len(databases)
# query with a string.
query_string = 'SELECT * FROM root r WHERE r.id="' + db2.id + '"'
databases = [database async for database in
self.client.query_databases(query=query_string)]
assert 1 == len(databases)
await self.client.delete_database(db1.id)
await self.client.delete_database(db2.id)
async def test_database_account_functionality_async(self):
# Validate database account functionality.
database_account = await self.client._get_database_account()
assert database_account.DatabasesLink == '/dbs/'
assert database_account.MediaLink == '/media/'
if (HttpHeaders.MaxMediaStorageUsageInMB in
self.client.client_connection.last_response_headers):
assert database_account.MaxMediaStorageUsageInMB == self.client.client_connection.last_response_headers[
HttpHeaders.MaxMediaStorageUsageInMB]
if (HttpHeaders.CurrentMediaStorageUsageInMB in
self.client.client_connection.last_response_headers):
assert database_account.CurrentMediaStorageUsageInMB == self.client.client_connection.last_response_headers[
HttpHeaders.CurrentMediaStorageUsageInMB]
assert database_account.ConsistencyPolicy['defaultConsistencyLevel'] is not None
if __name__ == '__main__':
unittest.main()
|