1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
import time
from multiprocessing import Process
from unittest import mock
from redis import Redis
from redis.exceptions import ResponseError
from rq import Queue, Worker
from rq.command import send_command, send_kill_horse_command, send_shutdown_command, send_stop_job_command
from rq.exceptions import InvalidJobOperation, NoSuchJobError
from rq.serializers import JSONSerializer
from rq.worker import WorkerStatus
from tests import RQTestCase
from tests.fixtures import _send_kill_horse_command, _send_shutdown_command, long_running_job, raise_exc_mock
def start_work(queue_name, worker_name, connection_kwargs):
worker = Worker(queue_name, name=worker_name, connection=Redis(**connection_kwargs))
worker.work()
def start_work_burst(queue_name, worker_name, connection_kwargs):
worker = Worker(queue_name, name=worker_name, connection=Redis(**connection_kwargs), serializer=JSONSerializer)
worker.work(burst=True)
class TestCommands(RQTestCase):
def test_shutdown_command(self):
"""Ensure that shutdown command works properly."""
connection = self.connection
worker = Worker('foo', connection=connection)
p = Process(
target=_send_shutdown_command, args=(worker.name, connection.connection_pool.connection_kwargs.copy())
)
p.start()
worker.work()
p.join(1)
def test_pubsub_thread_survives_connection_error(self):
"""Ensure that the pubsub thread is still alive after its Redis connection is killed"""
connection = self.connection
worker = Worker('foo', connection=connection)
worker.subscribe()
assert worker.pubsub_thread.is_alive()
# Kill the Redis connection
for client in connection.client_list():
try:
connection.client_kill(client['addr'])
except ResponseError:
pass
time.sleep(0.0) # Allow other threads to run
assert worker.pubsub_thread.is_alive()
def test_pubsub_thread_exits_other_error(self):
"""Ensure that the pubsub thread exits on other than redis.exceptions.ConnectionError"""
connection = self.connection
worker = Worker('foo', connection=connection)
with mock.patch('redis.client.PubSub.get_message', new_callable=raise_exc_mock):
worker.subscribe()
worker.pubsub_thread.join()
assert not worker.pubsub_thread.is_alive()
def test_kill_horse_command(self):
"""Ensure that shutdown command works properly."""
connection = self.connection
queue = Queue('foo', connection=connection)
job = queue.enqueue(long_running_job, 4)
worker = Worker('foo', connection=connection)
p = Process(
target=_send_kill_horse_command, args=(worker.name, connection.connection_pool.connection_kwargs.copy())
)
p.start()
worker.work(burst=True)
p.join(1)
job.refresh()
self.assertTrue(job.id in queue.failed_job_registry)
p = Process(target=start_work, args=('foo', worker.name, connection.connection_pool.connection_kwargs.copy()))
p.start()
p.join(2)
send_kill_horse_command(connection, worker.name)
worker.refresh()
# Since worker is not busy, command will be ignored
self.assertEqual(worker.get_state(), WorkerStatus.IDLE)
send_shutdown_command(connection, worker.name)
def test_stop_job_command(self):
"""Ensure that stop_job command works properly."""
connection = self.connection
queue = Queue('foo', connection=connection, serializer=JSONSerializer)
job = queue.enqueue(long_running_job, 3)
worker = Worker('foo', connection=connection, serializer=JSONSerializer)
# If job is not executing, an error is raised
with self.assertRaises(InvalidJobOperation):
send_stop_job_command(connection, job_id=job.id, serializer=JSONSerializer)
# An exception is raised if job ID is invalid
with self.assertRaises(NoSuchJobError):
send_stop_job_command(connection, job_id='1', serializer=JSONSerializer)
p = Process(
target=start_work_burst, args=('foo', worker.name, connection.connection_pool.connection_kwargs.copy())
)
p.start()
p.join(1)
time.sleep(0.1)
send_command(connection, worker.name, 'stop-job', job_id=1)
time.sleep(0.25)
# Worker still working due to job_id mismatch
worker.refresh()
self.assertEqual(worker.get_state(), WorkerStatus.BUSY)
send_stop_job_command(connection, job_id=job.id, serializer=JSONSerializer)
time.sleep(0.25)
# Job status is set appropriately
self.assertTrue(job.is_stopped)
# Worker has stopped working
worker.refresh()
self.assertEqual(worker.get_state(), WorkerStatus.IDLE)
|