File: immediate.py

package info (click to toggle)
python-django-tasks 0.12.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 256 kB
  • sloc: python: 1,703; sh: 5; makefile: 4
file content (113 lines) | stat: -rw-r--r-- 3,528 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import logging
from typing import TypeVar

from django.utils import timezone
from typing_extensions import ParamSpec

from django_tasks.base import Task, TaskContext, TaskError, TaskResult, TaskResultStatus
from django_tasks.signals import task_enqueued, task_finished, task_started
from django_tasks.utils import (
    get_exception_traceback,
    get_module_path,
    get_random_id,
    normalize_json,
)

from .base import BaseTaskBackend

logger = logging.getLogger(__name__)


T = TypeVar("T")
P = ParamSpec("P")


class ImmediateBackend(BaseTaskBackend):
    supports_async_task = True
    supports_priority = True

    def __init__(self, alias: str, params: dict):
        super().__init__(alias, params)

        self.worker_id = get_random_id()

    def _execute_task(self, task_result: TaskResult) -> None:
        """
        Execute the Task for the given TaskResult, mutating it with the outcome
        """
        object.__setattr__(task_result, "enqueued_at", timezone.now())
        task_enqueued.send(type(self), task_result=task_result)

        task = task_result.task

        task_start_time = timezone.now()

        object.__setattr__(task_result, "status", TaskResultStatus.RUNNING)
        object.__setattr__(task_result, "started_at", task_start_time)
        object.__setattr__(task_result, "last_attempted_at", task_start_time)
        task_result.worker_ids.append(self.worker_id)
        task_started.send(sender=type(self), task_result=task_result)

        try:
            if task.takes_context:
                raw_return_value = task.call(
                    TaskContext(task_result=task_result),
                    *task_result.args,
                    **task_result.kwargs,
                )
            else:
                raw_return_value = task.call(*task_result.args, **task_result.kwargs)

            object.__setattr__(
                task_result,
                "_return_value",
                normalize_json(raw_return_value),
            )
        except KeyboardInterrupt:
            # If the user tried to terminate, let them
            raise
        except BaseException as e:
            object.__setattr__(task_result, "finished_at", timezone.now())

            task_result.errors.append(
                TaskError(
                    exception_class_path=get_module_path(type(e)),
                    traceback=get_exception_traceback(e),
                )
            )

            object.__setattr__(task_result, "status", TaskResultStatus.FAILED)

            task_finished.send(type(self), task_result=task_result)
        else:
            object.__setattr__(task_result, "finished_at", timezone.now())
            object.__setattr__(task_result, "status", TaskResultStatus.SUCCESSFUL)

            task_finished.send(type(self), task_result=task_result)

    def enqueue(
        self,
        task: Task[P, T],
        args: P.args,  # type:ignore[valid-type]
        kwargs: P.kwargs,  # type:ignore[valid-type]
    ) -> TaskResult[T]:
        self.validate_task(task)

        task_result: TaskResult[T] = TaskResult(
            task=task,
            id=get_random_id(),
            status=TaskResultStatus.READY,
            enqueued_at=None,
            started_at=None,
            last_attempted_at=None,
            finished_at=None,
            args=args,
            kwargs=kwargs,
            backend=self.alias,
            errors=[],
            worker_ids=[],
        )

        self._execute_task(task_result)

        return task_result