File: test_database.py

package info (click to toggle)
django-celery 3.1.17-4
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 916 kB
  • ctags: 1,135
  • sloc: python: 4,149; makefile: 88; sh: 64
file content (105 lines) | stat: -rw-r--r-- 3,253 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from __future__ import absolute_import, unicode_literals

import celery

from datetime import timedelta

from celery import current_app
from celery import states
from celery.result import AsyncResult
from celery.task import PeriodicTask
from celery.utils import gen_unique_id

from djcelery.app import app
from djcelery.backends.database import DatabaseBackend
from djcelery.utils import now
from djcelery.tests.utils import unittest


class SomeClass(object):

    def __init__(self, data):
        self.data = data


class MyPeriodicTask(PeriodicTask):
    name = 'c.u.my-periodic-task-244'
    run_every = timedelta(seconds=1)

    def run(self, **kwargs):
        return 42


class TestDatabaseBackend(unittest.TestCase):

    def test_backend(self):
        b = DatabaseBackend(app=app)
        tid = gen_unique_id()

        self.assertEqual(b.get_status(tid), states.PENDING)
        self.assertIsNone(b.get_result(tid))

        b.mark_as_done(tid, 42)
        self.assertEqual(b.get_status(tid), states.SUCCESS)
        self.assertEqual(b.get_result(tid), 42)

        tid2 = gen_unique_id()
        result = {'foo': 'baz', 'bar': SomeClass(12345)}
        b.mark_as_done(tid2, result)
        # is serialized properly.
        rindb = b.get_result(tid2)
        self.assertEqual(rindb.get('foo'), 'baz')
        self.assertEqual(rindb.get('bar').data, 12345)

        tid3 = gen_unique_id()
        try:
            raise KeyError('foo')
        except KeyError as exception:
            b.mark_as_failure(tid3, exception)

        self.assertEqual(b.get_status(tid3), states.FAILURE)
        self.assertIsInstance(b.get_result(tid3), KeyError)

    def test_forget(self):
        b = DatabaseBackend(app=app)
        tid = gen_unique_id()
        b.mark_as_done(tid, {'foo': 'bar'})
        x = AsyncResult(tid)
        self.assertEqual(x.result.get('foo'), 'bar')
        x.forget()
        if celery.VERSION[0:3] == (3, 1, 10):
            # bug in 3.1.10 means result did not clear cache after forget.
            x._cache = None
        self.assertIsNone(x.result)

    def test_group_store(self):
        b = DatabaseBackend(app=app)
        tid = gen_unique_id()

        self.assertIsNone(b.restore_group(tid))

        result = {'foo': 'baz', 'bar': SomeClass(12345)}
        b.save_group(tid, result)
        rindb = b.restore_group(tid)
        self.assertIsNotNone(rindb)
        self.assertEqual(rindb.get('foo'), 'baz')
        self.assertEqual(rindb.get('bar').data, 12345)
        b.delete_group(tid)
        self.assertIsNone(b.restore_group(tid))

    def test_cleanup(self):
        b = DatabaseBackend(app=app)
        b.TaskModel._default_manager.all().delete()
        ids = [gen_unique_id() for _ in range(3)]
        for i, res in enumerate((16, 32, 64)):
            b.mark_as_done(ids[i], res)

        self.assertEqual(b.TaskModel._default_manager.count(), 3)

        then = now() - current_app.conf.CELERY_TASK_RESULT_EXPIRES * 2
        # Have to avoid save() because it applies the auto_now=True.
        b.TaskModel._default_manager.filter(task_id__in=ids[:-1]) \
                                    .update(date_done=then)

        b.cleanup()
        self.assertEqual(b.TaskModel._default_manager.count(), 1)