File: database.py

package info (click to toggle)
django-celery 3.1.17-4
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 916 kB
  • ctags: 1,135
  • sloc: python: 4,149; makefile: 88; sh: 64
file content (61 lines) | stat: -rw-r--r-- 2,010 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from __future__ import absolute_import, unicode_literals

from celery import current_app
from celery.backends.base import BaseDictBackend
from celery.utils.timeutils import maybe_timedelta

from ..models import TaskMeta, TaskSetMeta


class DatabaseBackend(BaseDictBackend):
    """The database backend.

    Using Django models to store task state.

    """
    TaskModel = TaskMeta
    TaskSetModel = TaskSetMeta

    expires = current_app.conf.CELERY_TASK_RESULT_EXPIRES
    create_django_tables = True

    subpolling_interval = 0.5

    def _store_result(self, task_id, result, status,
                      traceback=None, request=None):
        """Store return value and status of an executed task."""
        self.TaskModel._default_manager.store_result(
            task_id, result, status,
            traceback=traceback, children=self.current_task_children(request),
        )
        return result

    def _save_group(self, group_id, result):
        """Store the result of an executed group."""
        self.TaskSetModel._default_manager.store_result(group_id, result)
        return result

    def _get_task_meta_for(self, task_id):
        """Get task metadata for a task by id."""
        return self.TaskModel._default_manager.get_task(task_id).to_dict()

    def _restore_group(self, group_id):
        """Get group metadata for a group by id."""
        meta = self.TaskSetModel._default_manager.restore_taskset(group_id)
        if meta:
            return meta.to_dict()

    def _delete_group(self, group_id):
        self.TaskSetModel._default_manager.delete_taskset(group_id)

    def _forget(self, task_id):
        try:
            self.TaskModel._default_manager.get(task_id=task_id).delete()
        except self.TaskModel.DoesNotExist:
            pass

    def cleanup(self):
        """Delete expired metadata."""
        expires = maybe_timedelta(self.expires)
        for model in self.TaskModel, self.TaskSetModel:
            model._default_manager.delete_expired(expires)