File: rqworker.py

package info (click to toggle)
django-rq 3.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 648 kB
  • sloc: python: 3,184; makefile: 7
file content (102 lines) | stat: -rw-r--r-- 4,694 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import os
import sys

from redis.exceptions import ConnectionError
from rq.logutils import setup_loghandlers

from django.core.management.base import BaseCommand

from ...workers import get_worker
from ...utils import configure_sentry, reset_db_connections


class Command(BaseCommand):
    """
    Runs RQ workers on specified queues. Note that all queues passed into a
    single rqworker command must share the same connection.

    Example usage:
    python manage.py rqworker high medium low
    """

    args = '<queue queue ...>'

    def add_arguments(self, parser):
        parser.add_argument('--worker-class', action='store', dest='worker_class',
                            help='RQ Worker class to use')
        parser.add_argument('--pid', action='store', dest='pid',
                            default=None, help='PID file to write the worker`s pid into')
        parser.add_argument('--burst', action='store_true', dest='burst',
                            default=False, help='Run worker in burst mode')
        parser.add_argument('--with-scheduler', action='store_true', dest='with_scheduler',
                            default=False, help='Run worker with scheduler enabled')
        parser.add_argument('--name', action='store', dest='name',
                            default=None, help='Name of the worker')
        parser.add_argument('--queue-class', action='store', dest='queue_class',
                            help='Queues class to use')
        parser.add_argument('--job-class', action='store', dest='job_class',
                            help='Jobs class to use')
        parser.add_argument('--worker-ttl', action='store', type=int,
                            dest='worker_ttl', default=420,
                            help='Default worker timeout to be used')
        parser.add_argument('--sentry-dsn', action='store', default=None, dest='sentry_dsn',
                            help='Report exceptions to this Sentry DSN')
        parser.add_argument('--sentry-ca-certs', action='store', default=None, dest='sentry_ca_certs',
                            help='A path to an alternative CA bundle file in PEM-format')
        parser.add_argument('--sentry-debug', action='store', default=False, dest='sentry_debug',
                            help='Turns debug mode on or off.')
        parser.add_argument('--max-jobs', action='store', default=None, dest='max_jobs', type=int,
                            help='Maximum number of jobs to execute')
        parser.add_argument('--max-idle-time', action='store', default=None, dest='max_idle_time', type=int,
                            help='Seconds to wait for job before shutting down')
        parser.add_argument('--serializer', action='store', default='rq.serializers.DefaultSerializer', dest='serializer',
                            help='Specify a custom Serializer.')
        parser.add_argument('args', nargs='*', type=str,
                            help='The queues to work on, separated by space')

    def handle(self, *args, **options):
        pid = options.get('pid')
        if pid:
            with open(os.path.expanduser(pid), "w") as fp:
                fp.write(str(os.getpid()))

        # Verbosity is defined by default in BaseCommand for all commands
        verbosity = options['verbosity']
        if verbosity >= 2:
            level = 'DEBUG'
        elif verbosity == 0:
            level = 'WARNING'
        else:
            level = 'INFO'
        setup_loghandlers(level)

        sentry_dsn = options.pop('sentry_dsn')
        if sentry_dsn:
            try:
                configure_sentry(sentry_dsn, **options)
            except ImportError:
                self.stderr.write("Please install sentry-sdk using `pip install sentry-sdk`")
                sys.exit(1)

        try:
            # Instantiate a worker
            worker_kwargs = {
                'worker_class': options['worker_class'],
                'queue_class': options['queue_class'],
                'job_class': options['job_class'],
                'name': options['name'],
                'worker_ttl': options['worker_ttl'],
                'serializer': options['serializer']
            }
            w = get_worker(*args, **worker_kwargs)

            # Close any opened DB connection before any fork
            reset_db_connections()

            w.work(
                burst=options.get('burst', False), with_scheduler=options.get('with_scheduler', False),
                logging_level=level, max_jobs=options['max_jobs'], max_idle_time=options['max_idle_time']
            )
        except ConnectionError as e:
            self.stderr.write(str(e))
            sys.exit(1)