| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 
 | """
This file contains all jobs that are used in tests.  Each of these test
fixtures has a slightly different characteristics.
"""
import os
import signal
import subprocess
import sys
import time
from multiprocessing import Process
from typing import Optional
from redis import Redis
from rq import Queue, get_current_job
from rq.command import send_kill_horse_command, send_shutdown_command
from rq.defaults import DEFAULT_JOB_MONITORING_INTERVAL
from rq.job import Job
from rq.suspension import resume
from rq.worker import HerokuWorker, Worker
def say_pid():
    return os.getpid()
def say_hello(name=None):
    """A job with a single argument and a return value."""
    if name is None:
        name = 'Stranger'
    return 'Hi there, %s!' % (name,)
async def say_hello_async(name=None):
    """A async job with a single argument and a return value."""
    return say_hello(name)
def say_hello_unicode(name=None):
    """A job with a single argument and a return value."""
    return str(say_hello(name))  # noqa
def do_nothing():
    """The best job in the world."""
    pass
def raise_exc(*args, **kwargs):
    raise Exception('raise_exc error')
def raise_exc_mock():
    return raise_exc
def div_by_zero(x):
    """Prepare for a division-by-zero exception."""
    return x / 0
def long_process():
    time.sleep(60)
    return
def some_calculation(x, y, z=1):
    """Some arbitrary calculation with three numbers.  Choose z smartly if you
    want a division by zero exception.
    """
    return x * y / z
def rpush(key, value, connection_kwargs: dict, append_worker_name=False, sleep=0):
    """Push a value into a list in Redis. Useful for detecting the order in
    which jobs were executed."""
    if sleep:
        time.sleep(sleep)
    if append_worker_name:
        value += ':' + get_current_job().worker_name
    redis = Redis(**connection_kwargs)
    redis.rpush(key, value)
def check_dependencies_are_met():
    return get_current_job().dependencies_are_met()
def create_file(path):
    """Creates a file at the given path.  Actually, leaves evidence that the
    job ran."""
    with open(path, 'w') as f:
        f.write('Just a sentinel.')
def create_file_after_timeout(path, timeout):
    time.sleep(timeout)
    create_file(path)
def create_file_after_timeout_and_setpgrp(path, timeout):
    os.setpgrp()
    create_file_after_timeout(path, timeout)
def launch_process_within_worker_and_store_pid(path, timeout):
    p = subprocess.Popen(['sleep', str(timeout)])
    with open(path, 'w') as f:
        f.write(f'{p.pid}')
    p.wait()
def access_self():
    assert get_current_job() is not None
def modify_self(meta):
    j = get_current_job()
    j.meta.update(meta)
    j.save()
def modify_self_and_error(meta):
    j = get_current_job()
    j.meta.update(meta)
    j.save()
    return 1 / 0
def echo(*args, **kwargs):
    return args, kwargs
class Number:
    def __init__(self, value):
        self.value = value
    @classmethod
    def divide(cls, x, y):
        return x * y
    def div(self, y):
        return self.value / y
class CallableObject:
    def __call__(self):
        return "I'm callable"
class UnicodeStringObject:
    def __repr__(self):
        return 'é'
class ClassWithAStaticMethod:
    @staticmethod
    def static_method():
        return "I'm a static method"
def black_hole(job, *exc_info):
    # Don't fall through to default behaviour (moving to failed queue)
    return False
def add_meta(job, *exc_info):
    job.meta = {'foo': 1}
    job.save()
    return True
def save_key_ttl(key):
    # Stores key ttl in meta
    job = get_current_job()
    ttl = job.connection.ttl(key)
    job.meta = {'ttl': ttl}
    job.save_meta()
def long_running_job(timeout=10, horse_pid_key=None):
    job = get_current_job()
    if horse_pid_key:
        # Store the PID of the worker horse in a key
        job.connection.set(horse_pid_key, os.getpid(), ex=60)
    time.sleep(timeout)
    return 'Done sleeping...'
def run_dummy_heroku_worker(sandbox, _imminent_shutdown_delay, connection):
    """
    Run the work horse for a simplified heroku worker where perform_job just
    creates two sentinel files 2 seconds apart.
    :param sandbox: directory to create files in
    :param _imminent_shutdown_delay: delay to use for HerokuWorker
    """
    sys.stderr = open(os.path.join(sandbox, 'stderr.log'), 'w')
    class TestHerokuWorker(HerokuWorker):
        imminent_shutdown_delay = _imminent_shutdown_delay
        def perform_job(self, job, queue):
            create_file(os.path.join(sandbox, 'started'))
            # have to loop here rather than one sleep to avoid holding the GIL
            # and preventing signals being received
            for i in range(20):
                time.sleep(0.1)
            create_file(os.path.join(sandbox, 'finished'))
            return True
    w = TestHerokuWorker(Queue('dummy', connection=connection), connection=connection)
    w.main_work_horse(None, None)  # type: ignore[no-untyped-call]
class DummyQueue:
    pass
def kill_horse(horse_pid_key: str, connection_kwargs: dict, interval: float = 1.5):
    """
    Kill the worker horse process by its PID stored in a Redis key.
    :param horse_pid_key: Redis key where the horse PID is stored
    :param connection_kwargs: Connection parameters for Redis
    :param interval: Time to wait before sending the kill signal
    """
    time.sleep(interval)
    redis = Redis(**connection_kwargs)
    value = redis.get(horse_pid_key)
    if value:
        pid = int(value)
        os.kill(pid, signal.SIGKILL)
def kill_worker(pid: int, double_kill: bool, interval: float = 1.5):
    # wait for the worker to be started over on the main process
    time.sleep(interval)
    os.kill(pid, signal.SIGTERM)
    if double_kill:
        # give the worker time to switch signal handler
        time.sleep(interval)
        os.kill(pid, signal.SIGTERM)
def resume_worker(connection_kwargs: dict, interval: float = 1):
    # Wait and resume RQ
    time.sleep(interval)
    resume(Redis(**connection_kwargs))
class Serializer:
    def loads(self):
        pass
    def dumps(self):
        pass
def start_worker(queue_name, conn_kwargs, worker_name, burst, job_monitoring_interval=None):
    """
    Start a worker. We accept only serializable args, so that this can be
    executed via multiprocessing.
    """
    # Silence stdout (thanks to <https://stackoverflow.com/a/28321717/14153673>)
    # with open(os.devnull, 'w') as devnull:
    #     with contextlib.redirect_stdout(devnull):
    w = Worker(
        [queue_name],
        name=worker_name,
        connection=Redis(**conn_kwargs),
        job_monitoring_interval=job_monitoring_interval or DEFAULT_JOB_MONITORING_INTERVAL,
    )
    w.work(burst=burst)
def start_worker_process(
    queue_name, connection, worker_name=None, burst=False, job_monitoring_interval: Optional[int] = None
) -> Process:
    """
    Use multiprocessing to start a new worker in a separate process.
    """
    conn_kwargs = connection.connection_pool.connection_kwargs
    p = Process(target=start_worker, args=(queue_name, conn_kwargs, worker_name, burst, job_monitoring_interval))
    p.start()
    return p
def burst_two_workers(queue, connection: Redis, timeout=2, tries=5, pause=0.1):
    """
    Get two workers working simultaneously in burst mode, on a given queue.
    Return after both workers have finished handling jobs, up to a fixed timeout
    on the worker that runs in another process.
    """
    w1 = start_worker_process(queue.name, worker_name='w1', burst=True, connection=connection)
    w2 = Worker(queue, name='w2', connection=connection)
    jobs = queue.jobs
    if jobs:
        first_job = jobs[0]
        # Give the first worker process time to get started on the first job.
        # This is helpful in tests where we want to control which worker takes which job.
        n = 0
        while n < tries and not first_job.is_started:
            time.sleep(pause)
            n += 1
    # Now can start the second worker.
    w2.work(burst=True)
    w1.join(timeout)
def save_result(job, connection, result):
    """Store job result in a key"""
    connection.set('success_callback:%s' % job.id, result, ex=60)
def save_exception(job, connection, type, value, traceback):
    """Store job exception in a key"""
    connection.set('failure_callback:%s' % job.id, str(value), ex=60)
def save_result_if_not_stopped(job, connection, result=''):
    connection.set('stopped_callback:%s' % job.id, result, ex=60)
def erroneous_callback(job):
    """A callback that's not written properly"""
    pass
def _send_shutdown_command(worker_name, connection_kwargs, delay=0.25):
    time.sleep(delay)
    send_shutdown_command(Redis(**connection_kwargs), worker_name)
def _send_kill_horse_command(worker_name, connection_kwargs, delay=0.25):
    """Waits delay before sending kill-horse command"""
    time.sleep(delay)
    send_kill_horse_command(Redis(**connection_kwargs), worker_name)
class CustomJob(Job):
    """A custom job class just to test it"""
 |