File: base.py

package info (click to toggle)
python-django-tasks 0.12.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 256 kB
  • sloc: python: 1,703; sh: 5; makefile: 4
file content (139 lines) | stat: -rw-r--r-- 4,543 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from abc import ABCMeta, abstractmethod
from collections.abc import Iterable
from inspect import iscoroutinefunction
from typing import Any, TypeVar

from asgiref.sync import sync_to_async
from django.conf import settings
from django.core import checks
from django.utils import timezone
from django.utils.inspect import get_func_args
from typing_extensions import ParamSpec

from django_tasks.base import (
    DEFAULT_TASK_PRIORITY,
    TASK_MAX_PRIORITY,
    TASK_MIN_PRIORITY,
    Task,
    TaskResult,
)
from django_tasks.exceptions import InvalidTaskError
from django_tasks.utils import is_module_level_function

T = TypeVar("T")
P = ParamSpec("P")


class BaseTaskBackend(metaclass=ABCMeta):
    alias: str

    task_class = Task

    supports_defer = False
    """Does the backend support Tasks to be enqueued with the run_after attribute?"""

    supports_async_task = False
    """Does the backend support coroutines to be enqueued?"""

    supports_get_result = False
    """Does the backend support results being retrieved (from any thread / process)?"""

    supports_priority = False
    """Does the backend support tasks being executed in a given priority order?"""

    def __init__(self, alias: str, params: dict) -> None:
        from django_tasks import DEFAULT_TASK_QUEUE_NAME

        self.alias = alias
        self.queues = set(params.get("QUEUES", [DEFAULT_TASK_QUEUE_NAME]))
        self.options = params.get("OPTIONS", {})

    def validate_task(self, task: Task) -> None:
        """
        Determine whether the provided Task can be executed by the backend.
        """
        if not is_module_level_function(task.func):
            raise InvalidTaskError("Task function must be defined at a module level.")

        if not self.supports_async_task and iscoroutinefunction(task.func):
            raise InvalidTaskError("Backend does not support async tasks.")

        task_func_args = get_func_args(task.func)

        if task.takes_context and (
            not task_func_args or task_func_args[0] != "context"
        ):
            raise InvalidTaskError(
                "Task takes context but does not have a first argument of 'context'."
            )

        if not self.supports_priority and task.priority != DEFAULT_TASK_PRIORITY:
            raise InvalidTaskError(
                "Backend does not support setting priority of tasks."
            )

        if (
            task.priority < TASK_MIN_PRIORITY
            or task.priority > TASK_MAX_PRIORITY
            or int(task.priority) != task.priority
        ):
            raise InvalidTaskError(
                f"priority must be a whole number between {TASK_MIN_PRIORITY} and {TASK_MAX_PRIORITY}."
            )

        if not self.supports_defer and task.run_after is not None:
            raise InvalidTaskError("Backend does not support run_after.")

        if (
            settings.USE_TZ
            and task.run_after is not None
            and not timezone.is_aware(task.run_after)
        ):
            raise InvalidTaskError("run_after must be an aware datetime.")

        if self.queues and task.queue_name not in self.queues:
            raise InvalidTaskError(
                f"Queue '{task.queue_name}' is not valid for backend."
            )

    @abstractmethod
    def enqueue(
        self,
        task: Task[P, T],
        args: P.args,  # type:ignore[valid-type]
        kwargs: P.kwargs,  # type:ignore[valid-type]
    ) -> TaskResult[T]:
        """
        Queue up a task to be executed
        """

    async def aenqueue(
        self,
        task: Task[P, T],
        args: P.args,  # type:ignore[valid-type]
        kwargs: P.kwargs,  #  type:ignore[valid-type]
    ) -> TaskResult[T]:
        """
        Queue up a task function (or coroutine) to be executed
        """
        return await sync_to_async(self.enqueue, thread_sensitive=True)(
            task=task, args=args, kwargs=kwargs
        )

    def get_result(self, result_id: str) -> TaskResult:
        """
        Retrieve a result by id if it exists, otherwise raise
        ResultDoesNotExist.
        """
        raise NotImplementedError(
            "This backend does not support retrieving or refreshing results."
        )

    async def aget_result(self, result_id: str) -> TaskResult:
        """See get_result()."""
        return await sync_to_async(self.get_result, thread_sensitive=True)(
            result_id=result_id
        )

    def check(self, **kwargs: Any) -> Iterable[checks.CheckMessage]:
        return []