1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
|
import sys
from datetime import timedelta
import pytest
from billiard.einfo import ExceptionInfo
from celery import result, states, uuid
from kombu.utils.encoding import bytes_to_str
from django_celery_results.backends.cache import CacheBackend
class SomeClass:
def __init__(self, data):
self.data = data
class test_CacheBackend:
def setup_method(self):
self.b = CacheBackend(app=self.app)
def test_mark_as_done(self):
tid = uuid()
assert self.b.get_status(tid) == states.PENDING
assert self.b.get_result(tid) is None
self.b.mark_as_done(tid, 42)
assert self.b.get_status(tid) == states.SUCCESS
assert self.b.get_result(tid) == 42
def test_forget(self):
tid = uuid()
self.b.mark_as_done(tid, {'foo': 'bar'})
assert self.b.get_result(tid).get('foo') == 'bar'
self.b.forget(tid)
assert tid not in self.b._cache
assert self.b.get_result(tid) is None
@pytest.mark.usefixtures('depends_on_current_app')
def test_save_restore_delete_group(self):
group_id = uuid()
result_ids = [uuid() for i in range(10)]
results = list(map(result.AsyncResult, result_ids))
res = result.GroupResult(group_id, results)
res.save(backend=self.b)
saved = result.GroupResult.restore(group_id, backend=self.b)
assert saved.results == results
assert saved.id == group_id
saved.delete(backend=self.b)
assert result.GroupResult.restore(group_id, backend=self.b) is None
def test_is_pickled(self):
tid2 = uuid()
result = {'foo': 'baz', 'bar': SomeClass(12345)}
self.b.mark_as_done(tid2, result)
# is serialized properly.
rindb = self.b.get_result(tid2)
assert rindb.get('foo') == 'baz'
assert rindb.get('bar').data == 12345
def test_convert_key_from_byte_to_str(self):
""" Tests that key in byte form passed into cache
are succesfully completed """
tid = bytes_to_str(uuid())
assert self.b.get_status(tid) == states.PENDING
assert self.b.get_result(tid) is None
self.b.mark_as_done(tid, 42)
assert self.b.get_status(tid) == states.SUCCESS
assert self.b.get_result(tid) == 42
def test_mark_as_failure(self):
einfo = None
tid3 = uuid()
try:
raise KeyError('foo')
except KeyError as exception:
einfo = ExceptionInfo(sys.exc_info())
self.b.mark_as_failure(tid3, exception, traceback=einfo.traceback)
assert self.b.get_status(tid3) == states.FAILURE
assert isinstance(self.b.get_result(tid3), KeyError)
assert self.b.get_traceback(tid3) == einfo.traceback
def test_process_cleanup(self):
self.b.process_cleanup()
def test_set_expires(self):
cb1 = CacheBackend(app=self.app, expires=timedelta(seconds=16))
assert cb1.expires == 16
cb2 = CacheBackend(app=self.app, expires=32)
assert cb2.expires == 32
class test_custom_CacheBackend:
def test_custom_cache_backend(self):
self.app.conf.cache_backend = 'dummy'
b = CacheBackend(app=self.app)
assert (
b.cache_backend.__class__.__module__ == 'django.core.cache.backends.dummy' # noqa
)
|