File: tests.py

package info (click to toggle)
python-django-tasks-rq 0.12.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 160 kB
  • sloc: python: 682; sh: 5; makefile: 4
file content (480 lines) | stat: -rw-r--r-- 18,923 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
import os
import uuid
from typing import cast
from unittest.mock import patch

import django_rq
from asgiref.sync import async_to_sync
from django import VERSION
from django.core.exceptions import SuspiciousOperation
from django.test import SimpleTestCase, override_settings
from django_tasks import TaskResultStatus, default_task_backend, task_backends
from django_tasks.base import Task
from django_tasks.exceptions import InvalidTaskError, TaskResultDoesNotExist
from fakeredis import FakeRedis, FakeStrictRedis
from rq.defaults import UNSERIALIZABLE_RETURN_VALUE_PAYLOAD
from rq.timeouts import TimerDeathPenalty

from django_tasks_rq import compat
from django_tasks_rq.backend import Job, RQBackend
from tests import tasks as test_tasks


# RQ
# Configuration to pretend there is a Redis service available.
# Set up the connection before RQ Django reads the settings.
# The connection must be the same because in fakeredis connections
# do not share the state. Therefore, we define a singleton object to reuse it.
def get_fake_connection(config: dict, strict: bool) -> FakeRedis | FakeStrictRedis:
    redis_cls = FakeStrictRedis if strict else FakeRedis
    if "URL" in config:
        return redis_cls.from_url(
            config["URL"],
            db=config.get("DB"),
        )
    return redis_cls(
        host=config["HOST"],
        port=config["PORT"],
        db=config.get("DB", 0),
        username=config.get("USERNAME", None),
        password=config.get("PASSWORD"),
    )


class RQBackendTestCase(SimpleTestCase):
    def setUp(self) -> None:
        super().setUp()

        try:
            from django_rq.connection_utils import get_redis_connection  # noqa: F401
        except ImportError:
            # django-rq < 3.2
            fake_connection_patcher = patch(
                "django_rq.queues.get_redis_connection", get_fake_connection
            )
        else:
            fake_connection_patcher = patch(
                "django_rq.connection_utils.get_redis_connection", get_fake_connection
            )
        fake_connection_patcher.start()
        self.addCleanup(fake_connection_patcher.stop)

        django_rq.get_connection().flushall()

    def run_worker(self) -> None:
        from rq import SimpleWorker

        for queue in default_task_backend._get_queues():  # type: ignore[attr-defined]
            worker = SimpleWorker([queue], prepare_for_work=False, job_class=Job)

            # Use timer death penalty to support Windows
            worker.death_penalty_class = TimerDeathPenalty

            # HACK: Work around fakeredis not supporting `CLIENT LIST`
            worker.hostname = "example-hostname"
            worker.pid = os.getpid()

            with self.assertLogs("rq.worker"):
                worker.work(burst=True)

    def test_using_correct_backend(self) -> None:
        self.assertEqual(default_task_backend, task_backends["default"])
        self.assertIsInstance(task_backends["default"], RQBackend)
        self.assertEqual(default_task_backend.alias, "default")
        self.assertEqual(default_task_backend.options, {})

    def test_enqueue_task(self) -> None:
        for task in [test_tasks.noop_task, test_tasks.noop_task_async]:
            with self.subTest(task):
                result = cast(Task, task).enqueue(1, two=3)

                self.assertEqual(result.status, TaskResultStatus.READY)
                self.assertFalse(result.is_finished)
                self.assertIsNone(result.started_at)
                self.assertIsNone(result.last_attempted_at)
                self.assertIsNone(result.finished_at)
                with self.assertRaisesMessage(ValueError, "Task has not finished yet"):
                    result.return_value  # noqa:B018
                self.assertEqual(result.task, task)
                self.assertEqual(result.args, [1])
                self.assertEqual(result.kwargs, {"two": 3})
                self.assertEqual(result.attempts, 0)

    async def test_enqueue_task_async(self) -> None:
        for task in [test_tasks.noop_task, test_tasks.noop_task_async]:
            with self.subTest(task):
                result = await cast(Task, task).aenqueue()

                self.assertEqual(result.status, TaskResultStatus.READY)
                self.assertFalse(result.is_finished)
                self.assertIsNone(result.started_at)
                self.assertIsNone(result.last_attempted_at)
                self.assertIsNone(result.finished_at)
                with self.assertRaisesMessage(ValueError, "Task has not finished yet"):
                    result.return_value  # noqa:B018
                self.assertEqual(result.task, task)
                self.assertEqual(result.args, [])
                self.assertEqual(result.kwargs, {})
                self.assertEqual(result.attempts, 0)

    def test_catches_exception(self) -> None:
        test_data = [
            (
                test_tasks.failing_task_value_error,  # task function
                ValueError,  # expected exception
                "This task failed due to ValueError",  # expected message
            ),
            (
                test_tasks.failing_task_system_exit,
                SystemExit,
                "This task failed due to SystemExit",
            ),
        ]
        for task, exception, message in test_data:
            with (
                self.subTest(task),
            ):
                result = task.enqueue()

                with self.assertLogs("django_tasks", "DEBUG"):
                    self.run_worker()

                result.refresh()

                # assert result
                self.assertEqual(result.status, TaskResultStatus.FAILED)
                with self.assertRaisesMessage(ValueError, "Task failed"):
                    result.return_value  # noqa: B018
                self.assertTrue(result.is_finished)
                self.assertIsNotNone(result.started_at)
                self.assertIsNotNone(result.last_attempted_at)
                self.assertIsNotNone(result.finished_at)
                self.assertGreaterEqual(result.started_at, result.enqueued_at)  # type:ignore[arg-type, misc]
                self.assertGreaterEqual(result.finished_at, result.started_at)  # type:ignore[arg-type, misc]
                self.assertEqual(result.errors[0].exception_class, exception)
                traceback = result.errors[0].traceback
                self.assertTrue(
                    traceback
                    and traceback.endswith(f"{exception.__name__}: {message}\n"),
                    traceback,
                )
                self.assertEqual(result.task, task)
                self.assertEqual(result.args, [])
                self.assertEqual(result.kwargs, {})
                self.assertEqual(result.attempts, 1)

    def test_complex_exception(self) -> None:
        result = test_tasks.complex_exception.enqueue()

        with self.assertLogs("django_tasks", "DEBUG"):
            self.run_worker()

        result.refresh()

        self.assertEqual(result.status, TaskResultStatus.FAILED)
        self.assertIsNotNone(result.started_at)
        self.assertIsNotNone(result.last_attempted_at)
        self.assertIsNotNone(result.finished_at)
        self.assertGreaterEqual(result.started_at, result.enqueued_at)  # type:ignore[arg-type,misc]
        self.assertGreaterEqual(result.finished_at, result.started_at)  # type:ignore[arg-type,misc]

        self.assertIsNone(result._return_value)
        self.assertEqual(result.errors[0].exception_class, ValueError)
        self.assertIn(
            'ValueError(ValueError("This task failed"))', result.errors[0].traceback
        )

        self.assertEqual(result.task, test_tasks.complex_exception)
        self.assertEqual(result.args, [])
        self.assertEqual(result.kwargs, {})
        self.assertEqual(result.attempts, 1)

    def test_complex_return_value(self) -> None:
        result = test_tasks.complex_return_value.enqueue()

        with self.assertLogs("django_tasks", "DEBUG"):
            self.run_worker()

        result.refresh()

        self.assertEqual(result.status, TaskResultStatus.FAILED)
        self.assertIsNotNone(result.started_at)
        self.assertIsNotNone(result.last_attempted_at)
        self.assertIsNotNone(result.finished_at)
        self.assertGreaterEqual(result.started_at, result.enqueued_at)  # type:ignore[arg-type,misc]
        self.assertGreaterEqual(result.finished_at, result.started_at)  # type:ignore[arg-type,misc]

        self.assertIsNone(result._return_value)
        self.assertEqual(result.errors[0].exception_class, Exception)
        self.assertIn(UNSERIALIZABLE_RETURN_VALUE_PAYLOAD, result.errors[0].traceback)

    def test_get_result(self) -> None:
        result = default_task_backend.enqueue(test_tasks.noop_task, [], {})

        new_result = default_task_backend.get_result(result.id)

        self.assertEqual(result, new_result)

    async def test_get_result_async(self) -> None:
        result = await default_task_backend.aenqueue(test_tasks.noop_task, [], {})

        new_result = await default_task_backend.aget_result(result.id)

        self.assertEqual(result, new_result)

    def test_refresh_result(self) -> None:
        result = default_task_backend.enqueue(
            test_tasks.calculate_meaning_of_life, (), {}
        )

        self.run_worker()

        self.assertEqual(result.status, TaskResultStatus.READY)
        self.assertFalse(result.is_finished)
        self.assertIsNone(result.started_at)
        self.assertIsNone(result.last_attempted_at)
        self.assertIsNone(result.finished_at)
        self.assertEqual(result.attempts, 0)

        result.refresh()

        self.assertIsNotNone(result.started_at)
        self.assertIsNotNone(result.last_attempted_at)
        self.assertIsNotNone(result.finished_at)
        self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
        self.assertTrue(result.is_finished)
        self.assertEqual(result.return_value, 42)
        self.assertEqual(result.attempts, 1)

    def test_refresh_result_async(self) -> None:
        result = async_to_sync(default_task_backend.aenqueue)(
            test_tasks.calculate_meaning_of_life, (), {}
        )

        self.run_worker()

        self.assertEqual(result.status, TaskResultStatus.READY)
        self.assertFalse(result.is_finished)
        self.assertIsNone(result.started_at)
        self.assertIsNone(result.last_attempted_at)
        self.assertIsNone(result.finished_at)
        self.assertEqual(result.attempts, 0)

        async_to_sync(result.arefresh)()

        self.assertIsNotNone(result.started_at)
        self.assertIsNotNone(result.last_attempted_at)
        self.assertIsNotNone(result.finished_at)
        self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
        self.assertTrue(result.is_finished)
        self.assertEqual(result.return_value, 42)
        self.assertEqual(result.attempts, 1)

    def test_get_missing_result(self) -> None:
        with self.assertRaises(TaskResultDoesNotExist):
            default_task_backend.get_result(str(uuid.uuid4()))

    async def test_async_get_missing_result(self) -> None:
        with self.assertRaises(TaskResultDoesNotExist):
            await default_task_backend.aget_result(str(uuid.uuid4()))

    def test_invalid_uuid(self) -> None:
        with self.assertRaises(TaskResultDoesNotExist):
            default_task_backend.get_result("123")

    async def test_async_invalid_uuid(self) -> None:
        with self.assertRaises(TaskResultDoesNotExist):
            await default_task_backend.aget_result("123")

    def test_invalid_task_path(self) -> None:
        job = django_rq.get_queue("default", job_class=Job).enqueue_call(  # type: ignore[no-untyped-call]
            "subprocess.check_output", args=["exit", "1"]
        )

        with self.assertRaisesMessage(
            SuspiciousOperation,
            f"Task {job.id} does not point to a Task (subprocess.check_output)",
        ):
            default_task_backend.get_result(job.id)

    def test_check(self) -> None:
        errors = list(default_task_backend.check())

        self.assertEqual(len(errors), 0, errors)

    @override_settings(INSTALLED_APPS=[])
    def test_rq_app_missing(self) -> None:
        errors = list(default_task_backend.check())

        self.assertEqual(len(errors), 1)
        self.assertIn("django_rq", errors[0].hint)  # type:ignore[arg-type]

    def test_enqueue_logs(self) -> None:
        with self.assertLogs("django_tasks", level="DEBUG") as captured_logs:
            result = test_tasks.noop_task.enqueue()

        self.assertEqual(len(captured_logs.output), 1)
        self.assertIn("enqueued", captured_logs.output[0])
        self.assertIn(result.id, captured_logs.output[0])

    def test_started_finished_logs(self) -> None:
        result = test_tasks.noop_task.enqueue()

        with self.assertLogs("django_tasks", level="DEBUG") as captured_logs:
            self.run_worker()

        self.assertEqual(len(captured_logs.output), 2)
        self.assertIn("state=RUNNING", captured_logs.output[0])
        self.assertIn(result.id, captured_logs.output[0])

        self.assertIn("state=SUCCESSFUL", captured_logs.output[1])
        self.assertIn(result.id, captured_logs.output[1])

    def test_failed_logs(self) -> None:
        result = test_tasks.failing_task_value_error.enqueue()

        with self.assertLogs("django_tasks", level="DEBUG") as captured_logs:
            self.run_worker()

        self.assertEqual(len(captured_logs.output), 2)
        self.assertIn("state=RUNNING", captured_logs.output[0])
        self.assertIn(result.id, captured_logs.output[0])

        self.assertIn("state=FAILED", captured_logs.output[1])
        self.assertIn(result.id, captured_logs.output[1])

    def test_queue_isolation(self) -> None:
        default_task = test_tasks.noop_task.enqueue()
        other_task = test_tasks.noop_task.using(queue_name="queue-1").enqueue()

        default_task_backend.get_result(default_task.id)
        default_task_backend.get_result(other_task.id)

        self.assertEqual(django_rq.get_queue("default").job_ids, [default_task.id])
        self.assertEqual(django_rq.get_queue("queue-1").job_ids, [other_task.id])

    @override_settings(
        TASKS={"default": {"BACKEND": "django_tasks_rq.RQBackend", "QUEUES": []}}
    )
    def test_uses_rq_queues_for_queue_names(self) -> None:
        self.assertEqual(default_task_backend.queues, {"default", "queue-1"})

    @override_settings(
        TASKS={
            "default": {
                "BACKEND": "django_tasks_rq.RQBackend",
                "QUEUES": ["queue-2"],
            }
        }
    )
    def test_unknown_queue_name(self) -> None:
        errors = list(default_task_backend.check())

        self.assertEqual(len(errors), 1)
        self.assertIn("Add 'queue-2' to RQ_QUEUES", errors[0].hint)  # type:ignore[arg-type]

    def test_takes_context(self) -> None:
        result = test_tasks.get_task_id.enqueue()

        self.run_worker()

        result.refresh()

        self.assertEqual(result.return_value, result.id)

    def test_context(self) -> None:
        result = test_tasks.test_context.enqueue(1)
        self.run_worker()
        result.refresh()
        self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)

    def test_exception_classes_pop_empty_list_bug(self) -> None:
        """Test for IndexError: pop from empty list bug in task_result property

        This test creates a scenario where there are failed results but no
        exception classes in the job meta, which should be handled gracefully
        but currently causes an IndexError. When the bug is present, this test
        will fail with IndexError: pop from empty list.
        """
        # Create and run a failing task normally first
        result = test_tasks.failing_task_value_error.enqueue()

        with self.assertLogs("django_tasks", level="DEBUG"):
            self.run_worker()

        # Get the underlying RQ job
        job = cast(RQBackend, default_task_backend)._get_job(result.id)
        self.assertIsNotNone(job)
        assert job is not None

        # At this point, the job should have failed and have:
        #
        # - 1 failed result in job.results()
        # - 1 exception class in job.meta["_django_tasks_rq_exceptions"]
        #
        # Now simulate the bug scenario by removing the exceptions key from meta.
        # This creates a scenario where there are failed results but no
        # exception classes.
        job.meta.pop("_django_tasks_rq_exceptions", None)
        job.save_meta()  # type: ignore[no-untyped-call]

        # Clear the cached task_result to force re-computation
        job.__dict__.pop("task_result", None)

        # This should work without throwing an IndexError
        # When the bug is present, this will fail with IndexError: pop from empty list
        task_result = job.task_result

        # Verify the task_result is properly constructed despite missing exception classes
        self.assertEqual(task_result.status, TaskResultStatus.FAILED)
        self.assertEqual(task_result.task, test_tasks.failing_task_value_error)
        self.assertIsInstance(task_result.errors, list)
        self.assertEqual(
            task_result.errors[0].exception_class_path, "builtins.Exception"
        )

    def test_validate_on_enqueue(self) -> None:
        with override_settings(
            TASKS={
                "default": {
                    "BACKEND": "django_tasks_rq.RQBackend",
                    "QUEUES": ["unknown_queue"],
                }
            }
        ):
            task_with_custom_queue_name = test_tasks.noop_task.using(
                queue_name="unknown_queue"
            )

        with self.assertRaisesMessage(
            InvalidTaskError, "Queue 'unknown_queue' is not valid for backend"
        ):
            task_with_custom_queue_name.enqueue()

    async def test_validate_on_aenqueue(self) -> None:
        with override_settings(
            TASKS={
                "default": {
                    "BACKEND": "django_tasks_rq.RQBackend",
                    "QUEUES": ["unknown_queue"],
                }
            }
        ):
            task_with_custom_queue_name = test_tasks.noop_task.using(
                queue_name="unknown_queue"
            )

        with self.assertRaisesMessage(
            InvalidTaskError, "Queue 'unknown_queue' is not valid for backend"
        ):
            await task_with_custom_queue_name.aenqueue()


class CompatTestCase(SimpleTestCase):
    def test_compat_has_django_task(self) -> None:
        self.assertIn(Task, compat.TASK_CLASSES)

        if VERSION >= (6, 0):
            from django.tasks.base import Task as DjangoTask

            self.assertIn(DjangoTask, compat.TASK_CLASSES)