File: test_couchdb.py

package info (click to toggle)
celery 5.6.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,376 kB
  • sloc: python: 67,264; sh: 795; makefile: 378
file content (213 lines) | stat: -rw-r--r-- 7,606 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
from unittest.mock import MagicMock, Mock, sentinel
from urllib.parse import urlparse

import pytest
from kombu.utils.encoding import str_to_bytes

from celery import states, uuid
from celery.app import backends
from celery.backends import couchdb as module
from celery.backends.couchdb import CouchBackend
from celery.exceptions import ImproperlyConfigured

try:
    import pycouchdb
except ImportError:
    pycouchdb = None

COUCHDB_CONTAINER = 'celery_container'

pytest.importorskip('pycouchdb')


class test_CouchBackend:

    def setup_method(self):
        self.Server = self.patching('pycouchdb.Server')
        self.backend = CouchBackend(app=self.app)

    def test_init_no_pycouchdb(self):
        """test init no pycouchdb raises"""
        prev, module.pycouchdb = module.pycouchdb, None
        try:
            with pytest.raises(ImproperlyConfigured):
                CouchBackend(app=self.app)
        finally:
            module.pycouchdb = prev

    def test_get_container_exists(self):
        self.backend._connection = sentinel._connection
        connection = self.backend.connection
        assert connection is sentinel._connection
        self.Server.assert_not_called()

    def test_get(self):
        """test_get

        CouchBackend.get should return  and take two params
        db conn to couchdb is mocked.
        """
        x = CouchBackend(app=self.app)
        x._connection = Mock()
        get = x._connection.get = MagicMock()
        assert x.get('1f3fab') == get.return_value['value']
        x._connection.get.assert_called_once_with('1f3fab')

    def test_get_non_existent_key(self):
        x = CouchBackend(app=self.app)
        x._connection = Mock()
        get = x._connection.get = MagicMock()
        get.side_effect = pycouchdb.exceptions.NotFound
        assert x.get('1f3fab') is None
        x._connection.get.assert_called_once_with('1f3fab')

    @pytest.mark.parametrize("key", ['1f3fab', b'1f3fab'])
    def test_set(self, key):
        x = CouchBackend(app=self.app)
        x._connection = Mock()

        x._set_with_state(key, 'value', states.SUCCESS)

        x._connection.save.assert_called_once_with({'_id': '1f3fab',
                                                    'value': 'value'})

    @pytest.mark.parametrize("key", ['1f3fab', b'1f3fab'])
    def test_set_with_conflict(self, key):
        x = CouchBackend(app=self.app)
        x._connection = Mock()
        x._connection.save.side_effect = (pycouchdb.exceptions.Conflict, None)
        get = x._connection.get = MagicMock()

        x._set_with_state(key, 'value', states.SUCCESS)

        x._connection.get.assert_called_once_with('1f3fab')
        x._connection.get('1f3fab').__setitem__.assert_called_once_with(
            'value', 'value')
        x._connection.save.assert_called_with(get('1f3fab'))
        assert x._connection.save.call_count == 2

    def test_delete(self):
        """test_delete

        CouchBackend.delete should return and take two params
        db conn to pycouchdb is mocked.
        TODO Should test on key not exists

        """
        x = CouchBackend(app=self.app)
        x._connection = Mock()
        mocked_delete = x._connection.delete = Mock()
        mocked_delete.return_value = None
        # should return None
        assert x.delete('1f3fab') is None
        x._connection.delete.assert_called_once_with('1f3fab')

    def test_backend_by_url(self, url='couchdb://myhost/mycoolcontainer'):
        from celery.backends.couchdb import CouchBackend
        backend, url_ = backends.by_url(url, self.app.loader)
        assert backend is CouchBackend
        assert url_ == url

    def test_backend_params_by_url(self):
        url = 'couchdb://johndoe:mysecret@myhost:123/mycoolcontainer'
        with self.Celery(backend=url) as app:
            x = app.backend
            assert x.container == 'mycoolcontainer'
            assert x.host == 'myhost'
            assert x.username == 'johndoe'
            assert x.password == 'mysecret'
            assert x.port == 123


class CouchSessionMock:
    """
    Mock for `requests.session` that emulates couchdb storage.
    """

    _store = {}

    def request(self, method, url, stream=False, data=None, params=None,
                headers=None, **kw):
        tid = urlparse(url).path.split("/")[-1]

        response = Mock()
        response.headers = {"content-type": "application/json"}
        response.status_code = 200
        response.content = b''

        if method == "GET":
            if tid not in self._store:
                return self._not_found_response()
            response.content = self._store.get(tid)
        elif method == "PUT":
            self._store[tid] = data
            response.content = str_to_bytes(f'{{"ok":true,"id":"{tid}","rev":"1-revid"}}')
        elif method == "HEAD":
            if tid not in self._store:
                return self._not_found_response()
            response.headers.update({"etag": "1-revid"})
        elif method == "DELETE":
            if tid not in self._store:
                return self._not_found_response()
            del self._store[tid]
            response.content = str_to_bytes(f'{{"ok":true,"id":"{tid}","rev":"1-revid"}}')
        else:
            raise NotImplementedError(f"CouchSessionMock.request() does not handle {method} method")

        return response

    def _not_found_response(self):
        response = Mock()
        response.headers = {"content-type": "application/json"}
        response.status_code = 404
        response.content = str_to_bytes('{"error":"not_found","reason":"missing"}')
        return response


class test_CouchBackend_result:
    def setup_method(self):
        self.backend = CouchBackend(app=self.app)
        resource = pycouchdb.resource.Resource("resource-url", session=CouchSessionMock())
        self.backend._connection = pycouchdb.client.Database(resource, "container")

    def test_get_set_forget(self):
        tid = uuid()
        self.backend.store_result(tid, "successful-result", states.SUCCESS)
        assert self.backend.get_state(tid) == states.SUCCESS
        assert self.backend.get_result(tid) == "successful-result"
        self.backend.forget(tid)
        assert self.backend.get_state(tid) == states.PENDING

    def test_mark_as_started(self):
        tid = uuid()
        self.backend.mark_as_started(tid)
        assert self.backend.get_state(tid) == states.STARTED

    def test_mark_as_revoked(self):
        tid = uuid()
        self.backend.mark_as_revoked(tid)
        assert self.backend.get_state(tid) == states.REVOKED

    def test_mark_as_retry(self):
        tid = uuid()
        try:
            raise KeyError('foo')
        except KeyError as exception:
            import traceback
            trace = '\n'.join(traceback.format_stack())
            self.backend.mark_as_retry(tid, exception, traceback=trace)
            assert self.backend.get_state(tid) == states.RETRY
            assert isinstance(self.backend.get_result(tid), KeyError)
            assert self.backend.get_traceback(tid) == trace

    def test_mark_as_failure(self):
        tid = uuid()
        try:
            raise KeyError('foo')
        except KeyError as exception:
            import traceback
            trace = '\n'.join(traceback.format_stack())
            self.backend.mark_as_failure(tid, exception, traceback=trace)
            assert self.backend.get_state(tid) == states.FAILURE
            assert isinstance(self.backend.get_result(tid), KeyError)
            assert self.backend.get_traceback(tid) == trace