1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
|
import logging
from typing import TypeVar
from django.utils import timezone
from typing_extensions import ParamSpec
from django_tasks.base import Task, TaskContext, TaskError, TaskResult, TaskResultStatus
from django_tasks.signals import task_enqueued, task_finished, task_started
from django_tasks.utils import (
get_exception_traceback,
get_module_path,
get_random_id,
normalize_json,
)
from .base import BaseTaskBackend
logger = logging.getLogger(__name__)
T = TypeVar("T")
P = ParamSpec("P")
class ImmediateBackend(BaseTaskBackend):
supports_async_task = True
supports_priority = True
def __init__(self, alias: str, params: dict):
super().__init__(alias, params)
self.worker_id = get_random_id()
def _execute_task(self, task_result: TaskResult) -> None:
"""
Execute the Task for the given TaskResult, mutating it with the outcome
"""
object.__setattr__(task_result, "enqueued_at", timezone.now())
task_enqueued.send(type(self), task_result=task_result)
task = task_result.task
task_start_time = timezone.now()
object.__setattr__(task_result, "status", TaskResultStatus.RUNNING)
object.__setattr__(task_result, "started_at", task_start_time)
object.__setattr__(task_result, "last_attempted_at", task_start_time)
task_result.worker_ids.append(self.worker_id)
task_started.send(sender=type(self), task_result=task_result)
try:
if task.takes_context:
raw_return_value = task.call(
TaskContext(task_result=task_result),
*task_result.args,
**task_result.kwargs,
)
else:
raw_return_value = task.call(*task_result.args, **task_result.kwargs)
object.__setattr__(
task_result,
"_return_value",
normalize_json(raw_return_value),
)
except KeyboardInterrupt:
# If the user tried to terminate, let them
raise
except BaseException as e:
object.__setattr__(task_result, "finished_at", timezone.now())
task_result.errors.append(
TaskError(
exception_class_path=get_module_path(type(e)),
traceback=get_exception_traceback(e),
)
)
object.__setattr__(task_result, "status", TaskResultStatus.FAILED)
task_finished.send(type(self), task_result=task_result)
else:
object.__setattr__(task_result, "finished_at", timezone.now())
object.__setattr__(task_result, "status", TaskResultStatus.SUCCESSFUL)
task_finished.send(type(self), task_result=task_result)
def enqueue(
self,
task: Task[P, T],
args: P.args, # type:ignore[valid-type]
kwargs: P.kwargs, # type:ignore[valid-type]
) -> TaskResult[T]:
self.validate_task(task)
task_result: TaskResult[T] = TaskResult(
task=task,
id=get_random_id(),
status=TaskResultStatus.READY,
enqueued_at=None,
started_at=None,
last_attempted_at=None,
finished_at=None,
args=args,
kwargs=kwargs,
backend=self.alias,
errors=[],
worker_ids=[],
)
self._execute_task(task_result)
return task_result
|