File: base.py

package info (click to toggle)
python-django 3%3A6.0~alpha1-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 62,204 kB
  • sloc: python: 370,694; javascript: 19,376; xml: 211; makefile: 187; sh: 28
file content (112 lines) | stat: -rw-r--r-- 3,796 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from abc import ABCMeta, abstractmethod
from inspect import iscoroutinefunction

from asgiref.sync import sync_to_async

from django.conf import settings
from django.tasks import DEFAULT_TASK_QUEUE_NAME
from django.tasks.base import (
    DEFAULT_TASK_PRIORITY,
    TASK_MAX_PRIORITY,
    TASK_MIN_PRIORITY,
    Task,
)
from django.tasks.exceptions import InvalidTask
from django.utils import timezone
from django.utils.inspect import get_func_args, is_module_level_function


class BaseTaskBackend(metaclass=ABCMeta):
    task_class = Task

    # Does the backend support Tasks to be enqueued with the run_after
    # attribute?
    supports_defer = False

    # Does the backend support coroutines to be enqueued?
    supports_async_task = False

    # Does the backend support results being retrieved (from any
    # thread/process)?
    supports_get_result = False

    # Does the backend support executing Tasks in a given
    # priority order?
    supports_priority = False

    def __init__(self, alias, params):
        self.alias = alias
        self.queues = set(params.get("QUEUES", [DEFAULT_TASK_QUEUE_NAME]))
        self.options = params.get("OPTIONS", {})

    def validate_task(self, task):
        """
        Determine whether the provided Task can be executed by the backend.
        """
        if not is_module_level_function(task.func):
            raise InvalidTask("Task function must be defined at a module level.")

        if not self.supports_async_task and iscoroutinefunction(task.func):
            raise InvalidTask("Backend does not support async Tasks.")

        task_func_args = get_func_args(task.func)
        if task.takes_context and (
            not task_func_args or task_func_args[0] != "context"
        ):
            raise InvalidTask(
                "Task takes context but does not have a first argument of 'context'."
            )

        if not self.supports_priority and task.priority != DEFAULT_TASK_PRIORITY:
            raise InvalidTask("Backend does not support setting priority of tasks.")
        if (
            task.priority < TASK_MIN_PRIORITY
            or task.priority > TASK_MAX_PRIORITY
            or int(task.priority) != task.priority
        ):
            raise InvalidTask(
                f"priority must be a whole number between {TASK_MIN_PRIORITY} and "
                f"{TASK_MAX_PRIORITY}."
            )

        if not self.supports_defer and task.run_after is not None:
            raise InvalidTask("Backend does not support run_after.")

        if (
            settings.USE_TZ
            and task.run_after is not None
            and not timezone.is_aware(task.run_after)
        ):
            raise InvalidTask("run_after must be an aware datetime.")

        if self.queues and task.queue_name not in self.queues:
            raise InvalidTask(f"Queue '{task.queue_name}' is not valid for backend.")

    @abstractmethod
    def enqueue(self, task, args, kwargs):
        """Queue up a task to be executed."""

    async def aenqueue(self, task, args, kwargs):
        """Queue up a task function (or coroutine) to be executed."""
        return await sync_to_async(self.enqueue, thread_sensitive=True)(
            task=task, args=args, kwargs=kwargs
        )

    def get_result(self, result_id):
        """
        Retrieve a task result by id.

        Raise TaskResultDoesNotExist if such result does not exist.
        """
        raise NotImplementedError(
            "This backend does not support retrieving or refreshing results."
        )

    async def aget_result(self, result_id):
        """See get_result()."""
        return await sync_to_async(self.get_result, thread_sensitive=True)(
            result_id=result_id
        )

    def check(self, **kwargs):
        return []