File: rqworker-pool.py

package info (click to toggle)
django-rq 3.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 648 kB
  • sloc: python: 3,184; makefile: 7
file content (101 lines) | stat: -rw-r--r-- 4,604 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import os
import sys

from rq.serializers import resolve_serializer
from rq.worker_pool import WorkerPool
from rq.logutils import setup_loghandlers
from typing import cast

from django.core.management.base import BaseCommand

from ...jobs import get_job_class
from ...utils import configure_sentry
from ...queues import get_queues
from ...workers import get_worker_class


class Command(BaseCommand):
    """
    Runs RQ pool with x number of workers on specified queues.
    Note that all queues passed into a
    single rqworker-pool command must share the same connection.

    Example usage:
    python manage.py rqworker-pool high medium low --num-workers 4
    """

    args = '<queue queue ...>'

    def add_arguments(self, parser):
        parser.add_argument('--num-workers', action='store', dest='num_workers',
                            type=int, default=1, help='Number of workers to spawn')
        parser.add_argument('--worker-class', action='store', dest='worker_class',
                            help='RQ Worker class to use')
        parser.add_argument('--pid', action='store', dest='pid',
                            default=None, help='PID file to write the worker`s pid into')
        parser.add_argument('--burst', action='store_true', dest='burst',
                            default=False, help='Run worker in burst mode')
        parser.add_argument('--queue-class', action='store', dest='queue_class',
                            help='Queues class to use')
        parser.add_argument('--job-class', action='store', dest='job_class',
                            help='Jobs class to use')
        parser.add_argument('--serializer', action='store', default='rq.serializers.DefaultSerializer', dest='serializer',
                            help='Specify a custom Serializer.')
        parser.add_argument('args', nargs='*', type=str,
                            help='The queues to work on, separated by space')

        # Args present in `rqworker` command but not yet implemented here
        # parser.add_argument('--worker-ttl', action='store', type=int,
        #                     dest='worker_ttl', default=420,
        #                     help='Default worker timeout to be used')
        # parser.add_argument('--max-jobs', action='store', default=None, dest='max_jobs', type=int,
        #                     help='Maximum number of jobs to execute')
        # parser.add_argument('--with-scheduler', action='store_true', dest='with_scheduler',
        #                     default=False, help='Run worker with scheduler enabled')

        # Sentry arguments
        parser.add_argument('--sentry-dsn', action='store', default=None, dest='sentry_dsn',
                            help='Report exceptions to this Sentry DSN')
        parser.add_argument('--sentry-ca-certs', action='store', default=None, dest='sentry_ca_certs',
                            help='A path to an alternative CA bundle file in PEM-format')
        parser.add_argument('--sentry-debug', action='store', default=False, dest='sentry_debug',
                            help='Turns debug mode on or off.')

    def handle(self, *args, **options):
        pid = options.get('pid')
        if pid:
            with open(os.path.expanduser(pid), "w") as fp:
                fp.write(str(os.getpid()))

        # Verbosity is defined by default in BaseCommand for all commands
        verbosity: int = options['verbosity']
        if verbosity >= 2:
            logging_level = 'DEBUG'
        elif verbosity == 0:
            logging_level = 'WARNING'
        else:
            logging_level = 'INFO'
        setup_loghandlers(logging_level)

        sentry_dsn = options.pop('sentry_dsn')
        if sentry_dsn:
            try:
                configure_sentry(sentry_dsn, **options)
            except ImportError:
                self.stderr.write("Please install sentry-sdk using `pip install sentry-sdk`")
                sys.exit(1)

        job_class = get_job_class(options['job_class'])
        queues = get_queues(*args, **{'job_class': job_class, 'queue_class': options['queue_class']})
        worker_class = get_worker_class(options.get('worker_class', None))
        serializer = resolve_serializer(options['serializer'])

        pool = WorkerPool(
            queues=queues,
            connection=queues[0].connection,
            num_workers=options['num_workers'],
            serializer=serializer,
            worker_class=worker_class,
            job_class=job_class,
        )
        pool.start(burst=options.get('burst', False), logging_level=logging_level)