File: test_commands.py

package info (click to toggle)
python-rq 2.3.3-2
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 2,428 kB
  • sloc: python: 12,349; makefile: 22; sh: 19
file content (133 lines) | stat: -rw-r--r-- 4,955 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import time
from multiprocessing import Process
from unittest import mock

from redis import Redis
from redis.exceptions import ResponseError

from rq import Queue, Worker
from rq.command import send_command, send_kill_horse_command, send_shutdown_command, send_stop_job_command
from rq.exceptions import InvalidJobOperation, NoSuchJobError
from rq.serializers import JSONSerializer
from rq.worker import WorkerStatus
from tests import RQTestCase
from tests.fixtures import _send_kill_horse_command, _send_shutdown_command, long_running_job, raise_exc_mock


def start_work(queue_name, worker_name, connection_kwargs):
    worker = Worker(queue_name, name=worker_name, connection=Redis(**connection_kwargs))
    worker.work()


def start_work_burst(queue_name, worker_name, connection_kwargs):
    worker = Worker(queue_name, name=worker_name, connection=Redis(**connection_kwargs), serializer=JSONSerializer)
    worker.work(burst=True)


class TestCommands(RQTestCase):
    def test_shutdown_command(self):
        """Ensure that shutdown command works properly."""
        connection = self.connection
        worker = Worker('foo', connection=connection)

        p = Process(
            target=_send_shutdown_command, args=(worker.name, connection.connection_pool.connection_kwargs.copy())
        )
        p.start()
        worker.work()
        p.join(1)

    def test_pubsub_thread_survives_connection_error(self):
        """Ensure that the pubsub thread is still alive after its Redis connection is killed"""
        connection = self.connection
        worker = Worker('foo', connection=connection)
        worker.subscribe()

        assert worker.pubsub_thread.is_alive()

        # Kill the Redis connection
        for client in connection.client_list():
            try:
                connection.client_kill(client['addr'])
            except ResponseError:
                pass

        time.sleep(0.0)  # Allow other threads to run
        assert worker.pubsub_thread.is_alive()

    def test_pubsub_thread_exits_other_error(self):
        """Ensure that the pubsub thread  exits on other than redis.exceptions.ConnectionError"""
        connection = self.connection
        worker = Worker('foo', connection=connection)

        with mock.patch('redis.client.PubSub.get_message', new_callable=raise_exc_mock):
            worker.subscribe()

        worker.pubsub_thread.join()
        assert not worker.pubsub_thread.is_alive()

    def test_kill_horse_command(self):
        """Ensure that shutdown command works properly."""
        connection = self.connection
        queue = Queue('foo', connection=connection)
        job = queue.enqueue(long_running_job, 4)
        worker = Worker('foo', connection=connection)

        p = Process(
            target=_send_kill_horse_command, args=(worker.name, connection.connection_pool.connection_kwargs.copy())
        )
        p.start()
        worker.work(burst=True)
        p.join(1)
        job.refresh()
        self.assertTrue(job.id in queue.failed_job_registry)

        p = Process(target=start_work, args=('foo', worker.name, connection.connection_pool.connection_kwargs.copy()))
        p.start()
        p.join(2)

        send_kill_horse_command(connection, worker.name)
        worker.refresh()
        # Since worker is not busy, command will be ignored
        self.assertEqual(worker.get_state(), WorkerStatus.IDLE)
        send_shutdown_command(connection, worker.name)

    def test_stop_job_command(self):
        """Ensure that stop_job command works properly."""

        connection = self.connection
        queue = Queue('foo', connection=connection, serializer=JSONSerializer)
        job = queue.enqueue(long_running_job, 3)
        worker = Worker('foo', connection=connection, serializer=JSONSerializer)

        # If job is not executing, an error is raised
        with self.assertRaises(InvalidJobOperation):
            send_stop_job_command(connection, job_id=job.id, serializer=JSONSerializer)

        # An exception is raised if job ID is invalid
        with self.assertRaises(NoSuchJobError):
            send_stop_job_command(connection, job_id='1', serializer=JSONSerializer)

        p = Process(
            target=start_work_burst, args=('foo', worker.name, connection.connection_pool.connection_kwargs.copy())
        )
        p.start()
        p.join(1)

        time.sleep(0.1)

        send_command(connection, worker.name, 'stop-job', job_id=1)
        time.sleep(0.25)
        # Worker still working due to job_id mismatch
        worker.refresh()
        self.assertEqual(worker.get_state(), WorkerStatus.BUSY)

        send_stop_job_command(connection, job_id=job.id, serializer=JSONSerializer)
        time.sleep(0.25)

        # Job status is set appropriately
        self.assertTrue(job.is_stopped)

        # Worker has stopped working
        worker.refresh()
        self.assertEqual(worker.get_state(), WorkerStatus.IDLE)