File: backend.py

package info (click to toggle)
python-django-tasks-db 0.12.0-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 344 kB
  • sloc: python: 2,227; sh: 5; makefile: 4
file content (161 lines) | stat: -rw-r--r-- 5,437 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
from collections.abc import Iterable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, TypeVar

from django import VERSION
from django.apps import apps
from django.core import checks
from django.core.exceptions import ImproperlyConfigured, ValidationError
from django.db.models import Expression
from django.utils.module_loading import import_string
from django.utils.version import PY311
from django_tasks.backends.base import BaseTaskBackend
from django_tasks.base import Task
from django_tasks.base import TaskResult as BaseTaskResult
from django_tasks.exceptions import TaskResultDoesNotExist
from django_tasks.signals import task_enqueued
from django_tasks.utils import normalize_json
from typing_extensions import ParamSpec

if TYPE_CHECKING:
    from .models import DBTaskResult

T = TypeVar("T")
P = ParamSpec("P")


@dataclass(frozen=True, slots=PY311, kw_only=True)  # type: ignore[literal-required]
class TaskResult(BaseTaskResult[T]):
    db_result: "DBTaskResult"


class DatabaseBackend(BaseTaskBackend):
    supports_async_task = True
    supports_get_result = True
    supports_defer = True
    supports_priority = True

    def __init__(self, alias: str, params: dict) -> None:
        from .models import DBTaskResult

        super().__init__(alias, params)

        if id_function := self.options.get("id_function"):
            if callable(id_function):
                self.id_function = id_function
            else:
                self.id_function = import_string(id_function)
        else:
            # Fall back to the default defined on the model
            self.id_function = DBTaskResult._meta.pk.default

    def _get_id(self) -> Any:
        result_id = self.id_function()

        if VERSION < (6, 0) and isinstance(result_id, Expression):
            raise ImproperlyConfigured(
                "id_function cannot be a database expression until Django 6.0"
            )

        return result_id

    def _task_to_db_task(
        self,
        task: Task[P, T],
        args: P.args,  # type:ignore[valid-type]
        kwargs: P.kwargs,  # type:ignore[valid-type]
    ) -> "DBTaskResult":
        from .models import DBTaskResult

        return DBTaskResult.objects.create(
            id=self._get_id(),
            args_kwargs=normalize_json({"args": args, "kwargs": kwargs}),
            priority=task.priority,
            task_path=task.module_path,
            queue_name=task.queue_name,
            run_after=task.run_after,  # type: ignore[misc]
            backend_name=self.alias,
        )

    async def _atask_to_db_task(
        self,
        task: Task[P, T],
        args: P.args,  # type:ignore[valid-type]
        kwargs: P.kwargs,  # type:ignore[valid-type]
    ) -> "DBTaskResult":
        from .models import DBTaskResult

        return await DBTaskResult.objects.acreate(
            id=self._get_id(),
            args_kwargs=normalize_json({"args": args, "kwargs": kwargs}),
            priority=task.priority,
            task_path=task.module_path,
            queue_name=task.queue_name,
            run_after=task.run_after,  # type: ignore[misc]
            backend_name=self.alias,
        )

    def enqueue(
        self,
        task: Task[P, T],
        args: P.args,  # type:ignore[valid-type]
        kwargs: P.kwargs,  # type:ignore[valid-type]
    ) -> TaskResult[T]:
        self.validate_task(task)

        db_result = self._task_to_db_task(task, args, kwargs)

        task_enqueued.send(type(self), task_result=db_result.task_result)

        return db_result.task_result

    async def _asend_task_enqueued_signal(self, task_result: TaskResult) -> None:
        if VERSION < (5, 0):
            from asgiref.sync import sync_to_async

            await sync_to_async(task_enqueued.send, thread_sensitive=True)(
                type(self), task_result=task_result
            )
        else:
            await task_enqueued.asend(type(self), task_result=task_result)

    async def aenqueue(
        self,
        task: Task[P, T],
        args: P.args,  # type:ignore[valid-type]
        kwargs: P.kwargs,  #  type:ignore[valid-type]
    ) -> TaskResult[T]:
        self.validate_task(task)

        db_result = await self._atask_to_db_task(task, args, kwargs)

        await self._asend_task_enqueued_signal(db_result.task_result)

        return db_result.task_result

    def get_result(self, result_id: str) -> TaskResult:
        from .models import DBTaskResult

        try:
            return DBTaskResult.objects.get(id=result_id).task_result
        except (DBTaskResult.DoesNotExist, ValidationError) as e:
            raise TaskResultDoesNotExist(result_id) from e

    async def aget_result(self, result_id: str) -> TaskResult:
        from .models import DBTaskResult

        try:
            return (await DBTaskResult.objects.aget(id=result_id)).task_result
        except (DBTaskResult.DoesNotExist, ValidationError) as e:
            raise TaskResultDoesNotExist(result_id) from e

    def check(self, **kwargs: Any) -> Iterable[checks.CheckMessage]:
        yield from super().check(**kwargs)

        backend_name = self.__class__.__name__

        if not apps.is_installed("django_tasks_db"):
            yield checks.Error(
                f"{backend_name} configured as django_tasks_db backend, but database app not installed",
                "Insert 'django_tasks_db' in INSTALLED_APPS",
            )