1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
from typing import Optional, Type, Union
from rq import Worker
from rq.job import Job
from rq.utils import import_attribute
from django.conf import settings
from .jobs import get_job_class
from .queues import DjangoRQ, get_queues
def get_exception_handlers():
"""
Custom exception handlers could be defined in settings.py:
RQ = {
'EXCEPTION_HANDLERS': ['path.to.handler'],
}
"""
from .settings import EXCEPTION_HANDLERS
return [import_attribute(path) for path in EXCEPTION_HANDLERS]
def get_worker_class(worker_class=None):
"""
Return worker class from RQ settings, otherwise return Worker.
If `worker_class` is not None, it is used as an override (can be
python import path as string).
"""
RQ = getattr(settings, 'RQ', {})
if worker_class is None:
worker_class = Worker
if 'WORKER_CLASS' in RQ:
worker_class = RQ.get('WORKER_CLASS')
if isinstance(worker_class, str):
worker_class = import_attribute(worker_class)
return worker_class
def get_worker(
*queue_names: str,
job_class: Optional[Union[str, Type[Job]]] = None,
queue_class: Optional[Union[str, Type[DjangoRQ]]] = None,
worker_class: Optional[Union[str, Type[Worker]]] = None,
**kwargs,
) -> Worker:
"""
Returns a RQ worker for all queues or specified ones.
"""
job_class = get_job_class(job_class)
queues = get_queues(*queue_names, job_class=job_class, queue_class=queue_class)
# normalize queue_class to what get_queues returns
queue_class = queues[0].__class__
worker_class = get_worker_class(worker_class)
return worker_class(
queues,
connection=queues[0].connection,
exception_handlers=get_exception_handlers() or None,
job_class=job_class,
queue_class=queue_class,
**kwargs
)
|