File: signals.py

package info (click to toggle)
python-django-tasks 0.12.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 256 kB
  • sloc: python: 1,703; sh: 5; makefile: 4
file content (70 lines) | stat: -rw-r--r-- 1,883 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import logging

from asgiref.local import Local
from django.core.signals import setting_changed
from django.dispatch import Signal, receiver

from django_tasks import BaseTaskBackend, TaskResult, TaskResultStatus

task_enqueued = Signal()
task_finished = Signal()
task_started = Signal()


logger = logging.getLogger("django_tasks")


@receiver(setting_changed)
def clear_tasks_handlers(*, setting: str, **kwargs: dict) -> None:
    """
    Reset the connection handler whenever the settings change.
    """
    if setting == "TASKS":
        from django_tasks import task_backends

        task_backends._settings = task_backends.settings = (  # type:ignore[attr-defined]
            task_backends.configure_settings(None)
        )
        task_backends._connections = Local()  # type:ignore[attr-defined]


@receiver(task_enqueued)
def log_task_enqueued(
    sender: type[BaseTaskBackend], task_result: TaskResult, **kwargs: dict
) -> None:
    logger.debug(
        "Task id=%s path=%s enqueued backend=%s",
        task_result.id,
        task_result.task.module_path,
        task_result.backend,
    )


@receiver(task_started)
def log_task_started(
    sender: type[BaseTaskBackend], task_result: TaskResult, **kwargs: dict
) -> None:
    logger.info(
        "Task id=%s path=%s state=%s",
        task_result.id,
        task_result.task.module_path,
        task_result.status,
    )


@receiver(task_finished)
def log_task_finished(
    sender: type[BaseTaskBackend], task_result: TaskResult, **kwargs: dict
) -> None:
    if task_result.status == TaskResultStatus.FAILED:
        # Use exception to integrate with error monitoring tools
        log_method = logger.exception
    else:
        log_method = logger.info

    log_method(
        "Task id=%s path=%s state=%s",
        task_result.id,
        task_result.task.module_path,
        task_result.status,
    )