1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
from unittest.mock import Mock, call, patch
import pytest
from celery import states
from celery.backends import cosmosdbsql
from celery.backends.cosmosdbsql import CosmosDBSQLBackend
from celery.exceptions import ImproperlyConfigured
MODULE_TO_MOCK = "celery.backends.cosmosdbsql"
pytest.importorskip('pydocumentdb')
class test_DocumentDBBackend:
def setup_method(self):
self.url = "cosmosdbsql://:key@endpoint"
self.backend = CosmosDBSQLBackend(app=self.app, url=self.url)
def test_missing_third_party_sdk(self):
pydocumentdb = cosmosdbsql.pydocumentdb
try:
cosmosdbsql.pydocumentdb = None
with pytest.raises(ImproperlyConfigured):
CosmosDBSQLBackend(app=self.app, url=self.url)
finally:
cosmosdbsql.pydocumentdb = pydocumentdb
def test_bad_connection_url(self):
with pytest.raises(ImproperlyConfigured):
CosmosDBSQLBackend._parse_url(
"cosmosdbsql://:key@")
with pytest.raises(ImproperlyConfigured):
CosmosDBSQLBackend._parse_url(
"cosmosdbsql://:@host")
with pytest.raises(ImproperlyConfigured):
CosmosDBSQLBackend._parse_url(
"cosmosdbsql://corrupted")
def test_default_connection_url(self):
endpoint, password = CosmosDBSQLBackend._parse_url(
"cosmosdbsql://:key@host")
assert password == "key"
assert endpoint == "https://host:443"
endpoint, password = CosmosDBSQLBackend._parse_url(
"cosmosdbsql://:key@host:443")
assert password == "key"
assert endpoint == "https://host:443"
endpoint, password = CosmosDBSQLBackend._parse_url(
"cosmosdbsql://:key@host:8080")
assert password == "key"
assert endpoint == "http://host:8080"
def test_bad_partition_key(self):
with pytest.raises(ValueError):
CosmosDBSQLBackend._get_partition_key("")
with pytest.raises(ValueError):
CosmosDBSQLBackend._get_partition_key(" ")
with pytest.raises(ValueError):
CosmosDBSQLBackend._get_partition_key(None)
def test_bad_consistency_level(self):
with pytest.raises(ImproperlyConfigured):
CosmosDBSQLBackend(app=self.app, url=self.url,
consistency_level="DoesNotExist")
@patch(MODULE_TO_MOCK + ".DocumentClient")
def test_create_client(self, mock_factory):
mock_instance = Mock()
mock_factory.return_value = mock_instance
backend = CosmosDBSQLBackend(app=self.app, url=self.url)
# ensure database and collection get created on client access...
assert mock_instance.CreateDatabase.call_count == 0
assert mock_instance.CreateCollection.call_count == 0
assert backend._client is not None
assert mock_instance.CreateDatabase.call_count == 1
assert mock_instance.CreateCollection.call_count == 1
# ...but only once per backend instance
assert backend._client is not None
assert mock_instance.CreateDatabase.call_count == 1
assert mock_instance.CreateCollection.call_count == 1
@patch(MODULE_TO_MOCK + ".CosmosDBSQLBackend._client")
def test_get(self, mock_client):
self.backend.get(b"mykey")
mock_client.ReadDocument.assert_has_calls(
[call("dbs/celerydb/colls/celerycol/docs/mykey",
{"partitionKey": "mykey"}),
call().get("value")])
@patch(MODULE_TO_MOCK + ".CosmosDBSQLBackend._client")
def test_get_missing(self, mock_client):
mock_client.ReadDocument.side_effect = \
cosmosdbsql.HTTPFailure(cosmosdbsql.ERROR_NOT_FOUND)
assert self.backend.get(b"mykey") is None
@patch(MODULE_TO_MOCK + ".CosmosDBSQLBackend._client")
def test_set(self, mock_client):
self.backend._set_with_state(b"mykey", "myvalue", states.SUCCESS)
mock_client.CreateDocument.assert_called_once_with(
"dbs/celerydb/colls/celerycol",
{"id": "mykey", "value": "myvalue"},
{"partitionKey": "mykey"})
@patch(MODULE_TO_MOCK + ".CosmosDBSQLBackend._client")
def test_mget(self, mock_client):
keys = [b"mykey1", b"mykey2"]
self.backend.mget(keys)
mock_client.ReadDocument.assert_has_calls(
[call("dbs/celerydb/colls/celerycol/docs/mykey1",
{"partitionKey": "mykey1"}),
call().get("value"),
call("dbs/celerydb/colls/celerycol/docs/mykey2",
{"partitionKey": "mykey2"}),
call().get("value")])
@patch(MODULE_TO_MOCK + ".CosmosDBSQLBackend._client")
def test_delete(self, mock_client):
self.backend.delete(b"mykey")
mock_client.DeleteDocument.assert_called_once_with(
"dbs/celerydb/colls/celerycol/docs/mykey",
{"partitionKey": "mykey"})
|