File: test_cosmosdbsql.py

package info (click to toggle)
celery 5.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,008 kB
  • sloc: python: 64,346; sh: 795; makefile: 378
file content (139 lines) | stat: -rw-r--r-- 4,994 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from unittest.mock import Mock, call, patch

import pytest

from celery import states
from celery.backends import cosmosdbsql
from celery.backends.cosmosdbsql import CosmosDBSQLBackend
from celery.exceptions import ImproperlyConfigured

MODULE_TO_MOCK = "celery.backends.cosmosdbsql"

pytest.importorskip('pydocumentdb')


class test_DocumentDBBackend:
    def setup_method(self):
        self.url = "cosmosdbsql://:key@endpoint"
        self.backend = CosmosDBSQLBackend(app=self.app, url=self.url)

    def test_missing_third_party_sdk(self):
        pydocumentdb = cosmosdbsql.pydocumentdb
        try:
            cosmosdbsql.pydocumentdb = None
            with pytest.raises(ImproperlyConfigured):
                CosmosDBSQLBackend(app=self.app, url=self.url)
        finally:
            cosmosdbsql.pydocumentdb = pydocumentdb

    def test_bad_connection_url(self):
        with pytest.raises(ImproperlyConfigured):
            CosmosDBSQLBackend._parse_url(
                "cosmosdbsql://:key@")

        with pytest.raises(ImproperlyConfigured):
            CosmosDBSQLBackend._parse_url(
                "cosmosdbsql://:@host")

        with pytest.raises(ImproperlyConfigured):
            CosmosDBSQLBackend._parse_url(
                "cosmosdbsql://corrupted")

    def test_default_connection_url(self):
        endpoint, password = CosmosDBSQLBackend._parse_url(
            "cosmosdbsql://:key@host")

        assert password == "key"
        assert endpoint == "https://host:443"

        endpoint, password = CosmosDBSQLBackend._parse_url(
            "cosmosdbsql://:key@host:443")

        assert password == "key"
        assert endpoint == "https://host:443"

        endpoint, password = CosmosDBSQLBackend._parse_url(
            "cosmosdbsql://:key@host:8080")

        assert password == "key"
        assert endpoint == "http://host:8080"

    def test_bad_partition_key(self):
        with pytest.raises(ValueError):
            CosmosDBSQLBackend._get_partition_key("")

        with pytest.raises(ValueError):
            CosmosDBSQLBackend._get_partition_key("   ")

        with pytest.raises(ValueError):
            CosmosDBSQLBackend._get_partition_key(None)

    def test_bad_consistency_level(self):
        with pytest.raises(ImproperlyConfigured):
            CosmosDBSQLBackend(app=self.app, url=self.url,
                               consistency_level="DoesNotExist")

    @patch(MODULE_TO_MOCK + ".DocumentClient")
    def test_create_client(self, mock_factory):
        mock_instance = Mock()
        mock_factory.return_value = mock_instance
        backend = CosmosDBSQLBackend(app=self.app, url=self.url)

        # ensure database and collection get created on client access...
        assert mock_instance.CreateDatabase.call_count == 0
        assert mock_instance.CreateCollection.call_count == 0
        assert backend._client is not None
        assert mock_instance.CreateDatabase.call_count == 1
        assert mock_instance.CreateCollection.call_count == 1

        # ...but only once per backend instance
        assert backend._client is not None
        assert mock_instance.CreateDatabase.call_count == 1
        assert mock_instance.CreateCollection.call_count == 1

    @patch(MODULE_TO_MOCK + ".CosmosDBSQLBackend._client")
    def test_get(self, mock_client):
        self.backend.get(b"mykey")

        mock_client.ReadDocument.assert_has_calls(
            [call("dbs/celerydb/colls/celerycol/docs/mykey",
                  {"partitionKey": "mykey"}),
             call().get("value")])

    @patch(MODULE_TO_MOCK + ".CosmosDBSQLBackend._client")
    def test_get_missing(self, mock_client):
        mock_client.ReadDocument.side_effect = \
            cosmosdbsql.HTTPFailure(cosmosdbsql.ERROR_NOT_FOUND)

        assert self.backend.get(b"mykey") is None

    @patch(MODULE_TO_MOCK + ".CosmosDBSQLBackend._client")
    def test_set(self, mock_client):
        self.backend._set_with_state(b"mykey", "myvalue", states.SUCCESS)

        mock_client.CreateDocument.assert_called_once_with(
            "dbs/celerydb/colls/celerycol",
            {"id": "mykey", "value": "myvalue"},
            {"partitionKey": "mykey"})

    @patch(MODULE_TO_MOCK + ".CosmosDBSQLBackend._client")
    def test_mget(self, mock_client):
        keys = [b"mykey1", b"mykey2"]

        self.backend.mget(keys)

        mock_client.ReadDocument.assert_has_calls(
            [call("dbs/celerydb/colls/celerycol/docs/mykey1",
                  {"partitionKey": "mykey1"}),
             call().get("value"),
             call("dbs/celerydb/colls/celerycol/docs/mykey2",
                  {"partitionKey": "mykey2"}),
             call().get("value")])

    @patch(MODULE_TO_MOCK + ".CosmosDBSQLBackend._client")
    def test_delete(self, mock_client):
        self.backend.delete(b"mykey")

        mock_client.DeleteDocument.assert_called_once_with(
            "dbs/celerydb/colls/celerycol/docs/mykey",
            {"partitionKey": "mykey"})