File: __init__.py

package info (click to toggle)
python-django 3%3A6.0~alpha1-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 62,204 kB
  • sloc: python: 370,694; javascript: 19,376; xml: 211; makefile: 187; sh: 28
file content (45 lines) | stat: -rw-r--r-- 1,178 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from django.utils.connection import BaseConnectionHandler, ConnectionProxy
from django.utils.module_loading import import_string

from . import checks, signals  # NOQA
from .base import (
    DEFAULT_TASK_BACKEND_ALIAS,
    DEFAULT_TASK_QUEUE_NAME,
    Task,
    TaskContext,
    TaskResult,
    TaskResultStatus,
    task,
)
from .exceptions import InvalidTaskBackend

__all__ = [
    "DEFAULT_TASK_BACKEND_ALIAS",
    "DEFAULT_TASK_QUEUE_NAME",
    "default_task_backend",
    "task",
    "task_backends",
    "Task",
    "TaskContext",
    "TaskResult",
    "TaskResultStatus",
]


class TaskBackendHandler(BaseConnectionHandler):
    settings_name = "TASKS"
    exception_class = InvalidTaskBackend

    def create_connection(self, alias):
        params = self.settings[alias]
        backend = params["BACKEND"]
        try:
            backend_cls = import_string(backend)
        except ImportError as e:
            raise InvalidTaskBackend(f"Could not find backend '{backend}': {e}") from e
        return backend_cls(alias=alias, params=params)


task_backends = TaskBackendHandler()

default_task_backend = ConnectionProxy(task_backends, DEFAULT_TASK_BACKEND_ALIAS)