File: __init__.py

package info (click to toggle)
python-django-tasks 0.12.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 256 kB
  • sloc: python: 1,703; sh: 5; makefile: 4
file content (79 lines) | stat: -rw-r--r-- 2,211 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# ruff: noqa: E402
import django_stubs_ext

django_stubs_ext.monkeypatch()

import importlib.metadata

from django.conf import global_settings
from django.utils.connection import BaseConnectionHandler, ConnectionProxy
from django.utils.module_loading import import_string

from .backends.base import BaseTaskBackend
from .base import (
    DEFAULT_TASK_BACKEND_ALIAS,
    DEFAULT_TASK_QUEUE_NAME,
    TaskContext,
    TaskResult,
    TaskResultStatus,
    task,
)
from .exceptions import InvalidTaskBackendError

__version__ = importlib.metadata.version(__name__)

__all__ = [
    "task_backends",
    "default_task_backend",
    "DEFAULT_TASK_BACKEND_ALIAS",
    "DEFAULT_TASK_QUEUE_NAME",
    "TaskResultStatus",
    "TaskResult",
    "TaskContext",
    "task",
]


class TaskBackendHandler(BaseConnectionHandler[BaseTaskBackend]):
    settings_name = "TASKS"
    exception_class = InvalidTaskBackendError

    def configure_settings(self, settings: dict | None) -> dict:
        try:
            task_settings = super().configure_settings(settings)
        except AttributeError:
            # HACK: Force a default task backend.
            # Can be replaced with `django.conf.global_settings` once vendored.
            task_settings = None

        if task_settings is None or task_settings is getattr(
            global_settings, self.settings_name, None
        ):
            task_settings = {
                DEFAULT_TASK_BACKEND_ALIAS: {
                    "BACKEND": "django_tasks.backends.immediate.ImmediateBackend"
                }
            }

        return task_settings

    def create_connection(self, alias: str) -> BaseTaskBackend:
        params = self.settings[alias]

        backend = params["BACKEND"]

        try:
            backend_cls = import_string(backend)
        except ImportError as e:
            raise InvalidTaskBackendError(
                f"Could not find backend '{backend}': {e}"
            ) from e

        return backend_cls(alias=alias, params=params)  # type:ignore[no-any-return]


task_backends = TaskBackendHandler()

default_task_backend: BaseTaskBackend = ConnectionProxy(  # type:ignore[assignment]
    task_backends, DEFAULT_TASK_BACKEND_ALIAS
)