File: workers.py

package info (click to toggle)
django-rq 3.0.1-2
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 620 kB
  • sloc: python: 2,964; makefile: 8
file content (65 lines) | stat: -rw-r--r-- 1,868 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from typing import Optional, Type, Union

from rq import Worker
from rq.job import Job
from rq.utils import import_attribute

from django.conf import settings

from .jobs import get_job_class
from .queues import DjangoRQ, get_queues


def get_exception_handlers():
    """
    Custom exception handlers could be defined in settings.py:
    RQ = {
        'EXCEPTION_HANDLERS': ['path.to.handler'],
    }
    """
    from .settings import EXCEPTION_HANDLERS

    return [import_attribute(path) for path in EXCEPTION_HANDLERS]


def get_worker_class(worker_class=None):
    """
    Return worker class from RQ settings, otherwise return Worker.
    If `worker_class` is not None, it is used as an override (can be
    python import path as string).
    """
    RQ = getattr(settings, 'RQ', {})

    if worker_class is None:
        worker_class = Worker
        if 'WORKER_CLASS' in RQ:
            worker_class = RQ.get('WORKER_CLASS')

    if isinstance(worker_class, str):
        worker_class = import_attribute(worker_class)
    return worker_class


def get_worker(
    *queue_names: str,
    job_class: Optional[Union[str, Type[Job]]] = None,
    queue_class: Optional[Union[str, Type[DjangoRQ]]] = None,
    worker_class: Optional[Union[str, Type[Worker]]] = None,
    **kwargs,
) -> Worker:
    """
    Returns a RQ worker for all queues or specified ones.
    """
    job_class = get_job_class(job_class)
    queues = get_queues(*queue_names, job_class=job_class, queue_class=queue_class)
    # normalize queue_class to what get_queues returns
    queue_class = queues[0].__class__
    worker_class = get_worker_class(worker_class)
    return worker_class(
        queues,
        connection=queues[0].connection,
        exception_handlers=get_exception_handlers() or None,
        job_class=job_class,
        queue_class=queue_class,
        **kwargs
    )