1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849
|
import json
from datetime import datetime, timedelta, timezone
from unittest.mock import patch
from rq import Queue, Retry
from rq.job import Job, JobStatus
from rq.registry import (
CanceledJobRegistry,
DeferredJobRegistry,
FailedJobRegistry,
FinishedJobRegistry,
ScheduledJobRegistry,
StartedJobRegistry,
)
from rq.serializers import JSONSerializer
from rq.worker import Worker
from tests import RQTestCase, min_redis_version
from tests.fixtures import echo, say_hello
class MultipleDependencyJob(Job):
"""
Allows for the patching of `_dependency_ids` to simulate multi-dependency
support without modifying the public interface of `Job`
"""
create_job = Job.create
@classmethod
def create(cls, *args, **kwargs):
dependency_ids = kwargs.pop('kwargs').pop('_dependency_ids')
_job = cls.create_job(*args, **kwargs)
_job._dependency_ids = dependency_ids
return _job
class TestQueue(RQTestCase):
def test_create_queue(self):
"""Creating queues."""
q = Queue('my-queue', connection=self.connection)
self.assertEqual(q.name, 'my-queue')
self.assertEqual(str(q), '<Queue my-queue>')
def test_create_queue_with_serializer(self):
"""Creating queues with serializer."""
# Test using json serializer
q = Queue('queue-with-serializer', connection=self.connection, serializer=json)
self.assertEqual(q.name, 'queue-with-serializer')
self.assertEqual(str(q), '<Queue queue-with-serializer>')
self.assertIsNotNone(q.serializer)
def test_create_default_queue(self):
"""Instantiating the default queue."""
q = Queue(connection=self.connection)
self.assertEqual(q.name, 'default')
def test_equality(self):
"""Mathematical equality of queues."""
q1 = Queue('foo', connection=self.connection)
q2 = Queue('foo', connection=self.connection)
q3 = Queue('bar', connection=self.connection)
self.assertEqual(q1, q2)
self.assertEqual(q2, q1)
self.assertNotEqual(q1, q3)
self.assertNotEqual(q2, q3)
self.assertGreater(q1, q3)
self.assertRaises(TypeError, lambda: q1 == 'some string')
self.assertRaises(TypeError, lambda: q1 < 'some string')
def test_empty_queue(self):
"""Emptying queues."""
q = Queue('example', connection=self.connection)
self.connection.rpush('rq:queue:example', 'foo')
self.connection.rpush('rq:queue:example', 'bar')
self.assertEqual(q.is_empty(), False)
q.empty()
self.assertEqual(q.is_empty(), True)
self.assertIsNone(self.connection.lpop('rq:queue:example'))
def test_empty_removes_jobs(self):
"""Emptying a queue deletes the associated job objects"""
q = Queue('example', connection=self.connection)
job = q.enqueue(say_hello)
self.assertTrue(Job.exists(job.id, connection=self.connection))
q.empty()
self.assertFalse(Job.exists(job.id, connection=self.connection))
def test_queue_is_empty(self):
"""Detecting empty queues."""
q = Queue('example', connection=self.connection)
self.assertEqual(q.is_empty(), True)
self.connection.rpush('rq:queue:example', 'sentinel message')
self.assertEqual(q.is_empty(), False)
def test_queue_delete(self):
"""Test queue.delete properly removes queue"""
q = Queue('example', connection=self.connection)
job = q.enqueue(say_hello)
job2 = q.enqueue(say_hello)
self.assertEqual(2, len(q.get_job_ids()))
q.delete()
self.assertEqual(0, len(q.get_job_ids()))
self.assertEqual(False, self.connection.exists(job.key))
self.assertEqual(False, self.connection.exists(job2.key))
self.assertEqual(0, len(self.connection.smembers(Queue.redis_queues_keys)))
self.assertEqual(False, self.connection.exists(q.key))
def test_queue_delete_but_keep_jobs(self):
"""Test queue.delete properly removes queue but keeps the job keys in the redis store"""
q = Queue('example', connection=self.connection)
job = q.enqueue(say_hello)
job2 = q.enqueue(say_hello)
self.assertEqual(2, len(q.get_job_ids()))
q.delete(delete_jobs=False)
self.assertEqual(0, len(q.get_job_ids()))
self.assertEqual(True, self.connection.exists(job.key))
self.assertEqual(True, self.connection.exists(job2.key))
self.assertEqual(0, len(self.connection.smembers(Queue.redis_queues_keys)))
self.assertEqual(False, self.connection.exists(q.key))
def test_position(self):
"""Test queue.delete properly removes queue but keeps the job keys in the redis store"""
q = Queue('example', connection=self.connection)
job = q.enqueue(say_hello)
job2 = q.enqueue(say_hello)
job3 = q.enqueue(say_hello)
self.assertEqual(0, q.get_job_position(job.id))
self.assertEqual(1, q.get_job_position(job2.id))
self.assertEqual(2, q.get_job_position(job3))
self.assertEqual(None, q.get_job_position('no_real_job'))
def test_remove(self):
"""Ensure queue.remove properly removes Job from queue."""
q = Queue('example', serializer=JSONSerializer, connection=self.connection)
job = q.enqueue(say_hello)
self.assertIn(job.id, q.job_ids)
q.remove(job)
self.assertNotIn(job.id, q.job_ids)
job = q.enqueue(say_hello)
self.assertIn(job.id, q.job_ids)
q.remove(job.id)
self.assertNotIn(job.id, q.job_ids)
def test_jobs(self):
"""Getting jobs out of a queue."""
q = Queue('example', connection=self.connection)
self.assertEqual(q.jobs, [])
job = q.enqueue(say_hello)
self.assertEqual(q.jobs, [job])
# Deleting job removes it from queue
job.delete()
self.assertEqual(q.job_ids, [])
def test_compact(self):
"""Queue.compact() removes non-existing jobs."""
q = Queue(connection=self.connection)
q.enqueue(say_hello, 'Alice')
q.enqueue(say_hello, 'Charlie')
self.connection.lpush(q.key, '1', '2')
self.assertEqual(q.count, 4)
self.assertEqual(len(q), 4)
q.compact()
self.assertEqual(q.count, 2)
self.assertEqual(len(q), 2)
def test_enqueue(self):
"""Enqueueing job onto queues."""
q = Queue(connection=self.connection)
self.assertEqual(q.is_empty(), True)
# say_hello spec holds which queue this is sent to
job = q.enqueue(say_hello, 'Nick', foo='bar')
job_id = job.id
self.assertEqual(job.origin, q.name)
# Inspect data inside Redis
q_key = 'rq:queue:default'
self.assertEqual(self.connection.llen(q_key), 1)
self.assertEqual(self.connection.lrange(q_key, 0, -1)[0].decode('ascii'), job_id)
def test_enqueue_sets_metadata(self):
"""Enqueueing job onto queues modifies meta data."""
q = Queue(connection=self.connection)
job = Job.create(func=say_hello, args=('Nick',), kwargs=dict(foo='bar'), connection=self.connection)
# Preconditions
self.assertIsNone(job.enqueued_at)
# Action
q.enqueue_job(job)
# Postconditions
self.assertIsNotNone(job.enqueued_at)
def test_pop_job_id(self):
"""Popping job IDs from queues."""
# Set up
q = Queue(connection=self.connection)
uuid = '112188ae-4e9d-4a5b-a5b3-f26f2cb054da'
q.push_job_id(uuid)
# Pop it off the queue...
self.assertEqual(q.count, 1)
self.assertEqual(q.pop_job_id(), uuid)
# ...and assert the queue count when down
self.assertEqual(q.count, 0)
def test_dequeue_any(self):
"""Fetching work from any given queue."""
fooq = Queue('foo', connection=self.connection)
barq = Queue('bar', connection=self.connection)
self.assertRaises(ValueError, Queue.dequeue_any, [fooq, barq], timeout=0, connection=self.connection)
self.assertEqual(Queue.dequeue_any([fooq, barq], connection=self.connection, timeout=None), None)
# Enqueue a single item
barq.enqueue(say_hello)
job, queue = Queue.dequeue_any([fooq, barq], connection=self.connection, timeout=None)
self.assertEqual(job.func, say_hello)
self.assertEqual(queue, barq)
# Enqueue items on both queues
barq.enqueue(say_hello, 'for Bar')
fooq.enqueue(say_hello, 'for Foo')
job, queue = Queue.dequeue_any([fooq, barq], connection=self.connection, timeout=None)
self.assertEqual(queue, fooq)
self.assertEqual(job.func, say_hello)
self.assertEqual(job.origin, fooq.name)
self.assertEqual(job.args[0], 'for Foo', 'Foo should be dequeued first.')
job, queue = Queue.dequeue_any([fooq, barq], connection=self.connection, timeout=None)
self.assertEqual(queue, barq)
self.assertEqual(job.func, say_hello)
self.assertEqual(job.origin, barq.name)
self.assertEqual(job.args[0], 'for Bar', 'Bar should be dequeued second.')
@min_redis_version((6, 2, 0))
def test_dequeue_any_reliable(self):
"""Dequeueing job from a single queue moves job to intermediate queue."""
foo_queue = Queue('foo', connection=self.connection)
job_1 = foo_queue.enqueue(say_hello)
self.assertRaises(ValueError, Queue.dequeue_any, [foo_queue], timeout=0, connection=self.connection)
# Job ID is not in intermediate queue
self.assertIsNone(self.connection.lpos(foo_queue.intermediate_queue_key, job_1.id))
job, queue = Queue.dequeue_any([foo_queue], timeout=None, connection=self.connection)
self.assertEqual(queue, foo_queue)
self.assertEqual(job.func, say_hello)
# After job is dequeued, the job ID is in the intermediate queue
self.assertEqual(self.connection.lpos(foo_queue.intermediate_queue_key, job.id), 0)
# Test the blocking version
foo_queue.enqueue(say_hello)
job, queue = Queue.dequeue_any([foo_queue], timeout=1, connection=self.connection)
self.assertEqual(queue, foo_queue)
self.assertEqual(job.func, say_hello)
# After job is dequeued, the job ID is in the intermediate queue
self.assertEqual(self.connection.lpos(foo_queue.intermediate_queue_key, job.id), 1)
@min_redis_version((6, 2, 0))
def test_intermediate_queue(self):
"""Job should be stuck in intermediate queue if execution fails after dequeued."""
queue = Queue('foo', connection=self.connection)
job = queue.enqueue(say_hello)
# If job execution fails after it's dequeued, job should be in the intermediate queue
# # and it's status is still QUEUED
with patch.object(Worker, 'execute_job'):
# mocked.execute_job.side_effect = Exception()
worker = Worker(queue, connection=self.connection)
worker.work(burst=True)
# Job status is still QUEUED even though it's already dequeued
self.assertEqual(job.get_status(refresh=True), JobStatus.QUEUED)
self.assertNotIn(job.id, queue.get_job_ids())
self.assertIsNotNone(self.connection.lpos(queue.intermediate_queue_key, job.id))
def test_dequeue_any_ignores_nonexisting_jobs(self):
"""Dequeuing (from any queue) silently ignores non-existing jobs."""
q = Queue('low', connection=self.connection)
uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8'
q.push_job_id(uuid)
# Dequeue simply ignores the missing job and returns None
self.assertEqual(q.count, 1)
self.assertEqual(
Queue.dequeue_any(
[Queue(connection=self.connection), Queue('low', connection=self.connection)],
timeout=None,
connection=self.connection,
),
None,
)
self.assertEqual(q.count, 0)
def test_enqueue_with_ttl(self):
"""Negative TTL value is not allowed"""
queue = Queue(connection=self.connection)
self.assertRaises(ValueError, queue.enqueue, echo, 1, ttl=0)
self.assertRaises(ValueError, queue.enqueue, echo, 1, ttl=-1)
def test_enqueue_sets_status(self):
"""Enqueueing a job sets its status to "queued"."""
q = Queue(connection=self.connection)
job = q.enqueue(say_hello)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_meta_arg(self):
"""enQueue(connection=self.connection) can set the job.meta contents."""
q = Queue(connection=self.connection)
job = q.enqueue(say_hello, meta={'foo': 'bar', 'baz': 42})
self.assertEqual(job.meta['foo'], 'bar')
self.assertEqual(job.meta['baz'], 42)
def test_enqueue_with_failure_ttl(self):
"""enQueue(connection=self.connection) properly sets job.failure_ttl"""
q = Queue(connection=self.connection)
job = q.enqueue(say_hello, failure_ttl=10)
job.refresh()
self.assertEqual(job.failure_ttl, 10)
def test_job_timeout(self):
"""Timeout can be passed via job_timeout argument"""
queue = Queue(connection=self.connection)
job = queue.enqueue(echo, 1, job_timeout=15)
self.assertEqual(job.timeout, 15)
# Not passing job_timeout will use queue._default_timeout
job = queue.enqueue(echo, 1)
self.assertEqual(job.timeout, queue._default_timeout)
# job_timeout = 0 is not allowed
self.assertRaises(ValueError, queue.enqueue, echo, 1, job_timeout=0)
def test_default_timeout(self):
"""Timeout can be passed via job_timeout argument"""
queue = Queue(connection=self.connection)
job = queue.enqueue(echo, 1)
self.assertEqual(job.timeout, queue.DEFAULT_TIMEOUT)
job = Job.create(func=echo, connection=self.connection)
job = queue.enqueue_job(job)
self.assertEqual(job.timeout, queue.DEFAULT_TIMEOUT)
queue = Queue(connection=self.connection, default_timeout=15)
job = queue.enqueue(echo, 1)
self.assertEqual(job.timeout, 15)
job = Job.create(func=echo, connection=self.connection)
job = queue.enqueue_job(job)
self.assertEqual(job.timeout, 15)
def test_synchronous_timeout(self):
queue = Queue(is_async=False, connection=self.connection)
self.assertFalse(queue.is_async)
no_expire_job = queue.enqueue(echo, result_ttl=-1)
self.assertEqual(queue.connection.ttl(no_expire_job.key), -1)
delete_job = queue.enqueue(echo, result_ttl=0)
self.assertEqual(queue.connection.ttl(delete_job.key), -2)
keep_job = queue.enqueue(echo, result_ttl=100)
self.assertLessEqual(queue.connection.ttl(keep_job.key), 100)
def test_synchronous_ended_at(self):
queue = Queue(is_async=False, connection=self.connection)
echo_job = queue.enqueue(echo)
self.assertIsNotNone(echo_job.ended_at)
def test_enqueue_explicit_args(self):
"""enQueue(connection=self.connection) works for both implicit/explicit args."""
q = Queue(connection=self.connection)
# Implicit args/kwargs mode
job = q.enqueue(echo, 1, job_timeout=1, result_ttl=1, bar='baz')
self.assertEqual(job.timeout, 1)
self.assertEqual(job.result_ttl, 1)
self.assertEqual(job.perform(), ((1,), {'bar': 'baz'}))
# Explicit kwargs mode
kwargs = {
'timeout': 1,
'result_ttl': 1,
}
job = q.enqueue(echo, job_timeout=2, result_ttl=2, args=[1], kwargs=kwargs)
self.assertEqual(job.timeout, 2)
self.assertEqual(job.result_ttl, 2)
self.assertEqual(job.perform(), ((1,), {'timeout': 1, 'result_ttl': 1}))
# Explicit args and kwargs should also work with enqueue_at
time = datetime.now(timezone.utc) + timedelta(seconds=10)
job = q.enqueue_at(time, echo, job_timeout=2, result_ttl=2, args=[1], kwargs=kwargs)
self.assertEqual(job.timeout, 2)
self.assertEqual(job.result_ttl, 2)
self.assertEqual(job.perform(), ((1,), {'timeout': 1, 'result_ttl': 1}))
# Positional arguments is not allowed if explicit args and kwargs are used
self.assertRaises(Exception, q.enqueue, echo, 1, kwargs=kwargs)
def test_all_queues(self):
"""All queues"""
q1 = Queue('first-queue', connection=self.connection)
q2 = Queue('second-queue', connection=self.connection)
q3 = Queue('third-queue', connection=self.connection)
# Ensure a queue is added only once a job is enqueued
self.assertEqual(len(Queue.all(connection=self.connection)), 0)
q1.enqueue(say_hello)
self.assertEqual(len(Queue.all(connection=self.connection)), 1)
# Ensure this holds true for multiple queues
q2.enqueue(say_hello)
q3.enqueue(say_hello)
names = [q.name for q in Queue.all(connection=self.connection)]
self.assertEqual(len(Queue.all(connection=self.connection)), 3)
# Verify names
self.assertIn('first-queue', names)
self.assertIn('second-queue', names)
self.assertIn('third-queue', names)
# Now empty two queues
w = Worker([q2, q3], connection=self.connection)
w.work(burst=True)
# Queue.all(connection=self.connection) should still report the empty queues
self.assertEqual(len(Queue.all(connection=self.connection)), 3)
def test_all_custom_job(self):
class CustomJob(Job):
pass
q = Queue('all-queue', connection=self.connection)
q.enqueue(say_hello)
queues = Queue.all(job_class=CustomJob, connection=self.connection)
self.assertEqual(len(queues), 1)
self.assertIs(queues[0].job_class, CustomJob)
def test_all_queues_with_only_deferred_jobs(self):
"""All queues with only deferred jobs"""
queue_with_queued_jobs = Queue('queue_with_queued_jobs', connection=self.connection)
queue_with_deferred_jobs = Queue('queue_with_deferred_jobs', connection=self.connection)
parent_job = queue_with_queued_jobs.enqueue(say_hello)
queue_with_deferred_jobs.enqueue(say_hello, depends_on=parent_job)
# Ensure all queues are listed
self.assertEqual(len(Queue.all(connection=self.connection)), 2)
names = [q.name for q in Queue.all(connection=self.connection)]
# Verify names
self.assertIn('queue_with_queued_jobs', names)
self.assertIn('queue_with_deferred_jobs', names)
def test_from_queue_key(self):
"""Ensure being able to get a Queue instance manually from Redis"""
q = Queue(connection=self.connection)
key = Queue.redis_queue_namespace_prefix + 'default'
reverse_q = Queue.from_queue_key(key, connection=self.connection)
self.assertEqual(q, reverse_q)
def test_from_queue_key_error(self):
"""Ensure that an exception is raised if the queue prefix is wrong"""
key = 'some:weird:prefix:' + 'default'
self.assertRaises(ValueError, Queue.from_queue_key, key, connection=self.connection)
def test_enqueue_dependents(self):
"""Enqueueing dependent jobs pushes all jobs in the depends set to the queue
and removes them from DeferredJobQueue."""
q = Queue(connection=self.connection)
parent_job = Job.create(func=say_hello, connection=self.connection)
parent_job.save()
job_1 = q.enqueue(say_hello, depends_on=parent_job)
job_2 = q.enqueue(say_hello, depends_on=parent_job)
registry = DeferredJobRegistry(q.name, connection=self.connection)
parent_job.set_status(JobStatus.FINISHED)
self.assertEqual(set(registry.get_job_ids()), set([job_1.id, job_2.id]))
# After dependents is enqueued, job_1 and job_2 should be in queue
self.assertEqual(q.job_ids, [])
q.enqueue_dependents(parent_job)
self.assertEqual(set(q.job_ids), set([job_2.id, job_1.id]))
self.assertFalse(self.connection.exists(parent_job.dependents_key))
# DeferredJobRegistry should also be empty
self.assertEqual(registry.get_job_ids(), [])
def test_enqueue_dependents_on_multiple_queues(self):
"""Enqueueing dependent jobs on multiple queues pushes jobs in the queues
and removes them from DeferredJobRegistry for each different queue."""
q_1 = Queue('queue_1', connection=self.connection)
q_2 = Queue('queue_2', connection=self.connection)
parent_job = Job.create(func=say_hello, connection=self.connection)
parent_job.save()
job_1 = q_1.enqueue(say_hello, depends_on=parent_job)
job_2 = q_2.enqueue(say_hello, depends_on=parent_job)
# Each queue has its own DeferredJobRegistry
registry_1 = DeferredJobRegistry(q_1.name, connection=self.connection)
self.assertEqual(set(registry_1.get_job_ids()), set([job_1.id]))
registry_2 = DeferredJobRegistry(q_2.name, connection=self.connection)
parent_job.set_status(JobStatus.FINISHED)
self.assertEqual(set(registry_2.get_job_ids()), set([job_2.id]))
# After dependents is enqueued, job_1 on queue_1 and
# job_2 should be in queue_2
self.assertEqual(q_1.job_ids, [])
self.assertEqual(q_2.job_ids, [])
q_1.enqueue_dependents(parent_job)
q_2.enqueue_dependents(parent_job)
self.assertEqual(set(q_1.job_ids), set([job_1.id]))
self.assertEqual(set(q_2.job_ids), set([job_2.id]))
self.assertFalse(self.connection.exists(parent_job.dependents_key))
# DeferredJobRegistry should also be empty
self.assertEqual(registry_1.get_job_ids(), [])
self.assertEqual(registry_2.get_job_ids(), [])
def test_enqueue_job_with_dependency(self):
"""Jobs are enqueued only when their dependencies are finished."""
# Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello, connection=self.connection)
parent_job.save()
q = Queue(connection=self.connection)
job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, [])
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
# Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
def test_enqueue_job_with_dependency_and_pipeline(self):
"""Jobs are enqueued only when their dependencies are finished, and by the caller when passing a pipeline."""
# Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello, connection=self.connection)
parent_job.save()
q = Queue(connection=self.connection)
with q.connection.pipeline() as pipe:
job = q.enqueue_call(say_hello, depends_on=parent_job, pipeline=pipe)
self.assertEqual(q.job_ids, [])
self.assertEqual(job.get_status(refresh=False), JobStatus.DEFERRED)
# Not in registry before execute, since passed in pipeline
self.assertEqual(len(q.deferred_job_registry), 0)
pipe.execute()
# Only in registry after execute, since passed in pipeline
self.assertEqual(len(q.deferred_job_registry), 1)
# Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
with q.connection.pipeline() as pipe:
job = q.enqueue_call(say_hello, depends_on=parent_job, pipeline=pipe)
# Pre execute conditions
self.assertEqual(q.job_ids, [])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
pipe.execute()
# Post execute conditions
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
def test_enqueue_job_with_no_dependency_prior_watch_and_pipeline(self):
"""Jobs are enqueued only when their dependencies are finished, and by the caller when passing a pipeline."""
q = Queue(connection=self.connection)
with q.connection.pipeline() as pipe:
pipe.watch(b'fake_key') # Test watch then enqueue
job = q.enqueue_call(say_hello, pipeline=pipe)
self.assertEqual(q.job_ids, [])
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
# Not in queue before execute, since passed in pipeline
self.assertEqual(len(q), 0)
# Make sure modifying key doesn't cause issues, if in multi mode won't fail
pipe.set(b'fake_key', b'fake_value')
pipe.execute()
# Only in registry after execute, since passed in pipeline
self.assertEqual(len(q), 1)
def test_enqueue_many_internal_pipeline(self):
"""Jobs should be enqueued in bulk with an internal pipeline, enqueued in order provided
(but at_front still applies)"""
# Job with unfinished dependency is not immediately enqueued
q = Queue(connection=self.connection)
job_1_data = Queue.prepare_data(say_hello, job_id='fake_job_id_1', at_front=False)
job_2_data = Queue.prepare_data(say_hello, job_id='fake_job_id_2', at_front=False)
job_3_data = Queue.prepare_data(say_hello, job_id='fake_job_id_3', at_front=True)
jobs = q.enqueue_many(
[job_1_data, job_2_data, job_3_data],
)
for job in jobs:
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
# Only in registry after execute, since passed in pipeline
self.assertEqual(len(q), 3)
self.assertEqual(q.job_ids, ['fake_job_id_3', 'fake_job_id_1', 'fake_job_id_2'])
self.assertEqual(len(Queue.all(connection=self.connection)), 1)
def test_enqueue_many_with_passed_pipeline(self):
"""Jobs should be enqueued in bulk with a passed pipeline, enqueued in order provided
(but at_front still applies)"""
# Job with unfinished dependency is not immediately enqueued
q = Queue(connection=self.connection)
with q.connection.pipeline() as pipe:
job_1_data = Queue.prepare_data(say_hello, job_id='fake_job_id_1', at_front=False)
job_2_data = Queue.prepare_data(say_hello, job_id='fake_job_id_2', at_front=False)
job_3_data = Queue.prepare_data(say_hello, job_id='fake_job_id_3', at_front=True)
jobs = q.enqueue_many([job_1_data, job_2_data, job_3_data], pipeline=pipe)
self.assertEqual(q.job_ids, [])
for job in jobs:
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
pipe.execute()
# Only in registry after execute, since passed in pipeline
self.assertEqual(len(q), 3)
self.assertEqual(q.job_ids, ['fake_job_id_3', 'fake_job_id_1', 'fake_job_id_2'])
def test_enqueue_different_queues_with_passed_pipeline(self):
"""Jobs should be enqueued into different queues in a provided pipeline"""
q1 = Queue(name='q1', connection=self.connection)
q2 = Queue(name='q2', connection=self.connection)
q3 = Queue(name='q3', connection=self.connection)
queues = [q1, q2, q3]
jobs = []
with self.connection.pipeline() as pipe:
for idx, q in enumerate(queues):
jobs.append(q.enqueue_call(say_hello, job_id=f'fake_job_id_{idx}', pipeline=pipe))
for job in jobs:
self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED)
pipe.execute()
self.assertEqual(len(jobs), 3)
for idx, (job, q) in enumerate(zip(jobs, queues)):
# Check job is in the correct queue
self.assertEqual(job.id, f'fake_job_id_{idx}')
self.assertEqual(job.origin, q.name)
# Check queue contains the job
self.assertIn(job.id, q.job_ids)
def test_enqueue_job_with_dependency_by_id(self):
"""Can specify job dependency with job object or job id."""
parent_job = Job.create(func=say_hello, connection=self.connection)
parent_job.save()
q = Queue(connection=self.connection)
q.enqueue_call(say_hello, depends_on=parent_job.id)
self.assertEqual(q.job_ids, [])
# Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job.id)
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT)
def test_enqueue_job_with_dependency_and_timeout(self):
"""Jobs remember their timeout when enqueued as a dependency."""
# Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello, connection=self.connection)
parent_job.save()
q = Queue(connection=self.connection)
job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123)
self.assertEqual(q.job_ids, [])
self.assertEqual(job.timeout, 123)
# Jobs dependent on finished jobs are immediately enqueued
parent_job.set_status(JobStatus.FINISHED)
parent_job.save()
job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123)
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, 123)
def test_enqueue_job_with_multiple_queued_dependencies(self):
parent_jobs = [Job.create(func=say_hello, connection=self.connection) for _ in range(2)]
for job in parent_jobs:
job._status = JobStatus.QUEUED
job.save()
q = Queue(connection=self.connection)
with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
job = q.enqueue(say_hello, depends_on=parent_jobs[0], _dependency_ids=[job.id for job in parent_jobs])
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
self.assertEqual(q.job_ids, [])
self.assertEqual(job.fetch_dependencies(), parent_jobs)
def test_enqueue_job_with_multiple_finished_dependencies(self):
parent_jobs = [Job.create(func=say_hello, connection=self.connection) for _ in range(2)]
for job in parent_jobs:
job._status = JobStatus.FINISHED
job.save()
q = Queue(connection=self.connection)
with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
job = q.enqueue(say_hello, depends_on=parent_jobs[0], _dependency_ids=[job.id for job in parent_jobs])
self.assertEqual(job.get_status(), JobStatus.QUEUED)
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.fetch_dependencies(), parent_jobs)
def test_enqueues_dependent_if_other_dependencies_finished(self):
parent_jobs = [Job.create(func=say_hello, connection=self.connection) for _ in range(3)]
parent_jobs[0]._status = JobStatus.STARTED
parent_jobs[0].save()
parent_jobs[1]._status = JobStatus.FINISHED
parent_jobs[1].save()
parent_jobs[2]._status = JobStatus.FINISHED
parent_jobs[2].save()
q = Queue(connection=self.connection)
with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
# dependent job deferred, b/c parent_job 0 is still 'started'
dependent_job = q.enqueue(
say_hello, depends_on=parent_jobs[0], _dependency_ids=[job.id for job in parent_jobs]
)
self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
# now set parent job 0 to 'finished'
parent_jobs[0].set_status(JobStatus.FINISHED)
q.enqueue_dependents(parent_jobs[0])
self.assertEqual(dependent_job.get_status(), JobStatus.QUEUED)
self.assertEqual(q.job_ids, [dependent_job.id])
def test_does_not_enqueue_dependent_if_other_dependencies_not_finished(self):
started_dependency = Job.create(func=say_hello, status=JobStatus.STARTED, connection=self.connection)
started_dependency.save()
queued_dependency = Job.create(func=say_hello, status=JobStatus.QUEUED, connection=self.connection)
queued_dependency.save()
q = Queue(connection=self.connection)
with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
dependent_job = q.enqueue(
say_hello,
depends_on=[started_dependency],
_dependency_ids=[started_dependency.id, queued_dependency.id],
)
self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
q.enqueue_dependents(started_dependency)
self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
self.assertEqual(q.job_ids, [])
def test_fetch_job_successful(self):
"""Fetch a job from a queue."""
q = Queue('example', connection=self.connection)
job_orig = q.enqueue(say_hello)
job_fetch: Job = q.fetch_job(job_orig.id) # type: ignore
self.assertIsNotNone(job_fetch)
self.assertEqual(job_orig.id, job_fetch.id)
self.assertEqual(job_orig.description, job_fetch.description)
def test_fetch_job_missing(self):
"""Fetch a job from a queue which doesn't exist."""
q = Queue('example', connection=self.connection)
job = q.fetch_job('123')
self.assertIsNone(job)
def test_fetch_job_different_queue(self):
"""Fetch a job from a queue which is in a different queue."""
q1 = Queue('example1', connection=self.connection)
q2 = Queue('example2', connection=self.connection)
job_orig = q1.enqueue(say_hello)
job_fetch = q2.fetch_job(job_orig.id)
self.assertIsNone(job_fetch)
job_fetch = q1.fetch_job(job_orig.id)
self.assertIsNotNone(job_fetch)
def test_getting_registries(self):
"""Getting job registries from queue object"""
queue = Queue('example', connection=self.connection)
self.assertEqual(queue.scheduled_job_registry, ScheduledJobRegistry(queue=queue))
self.assertEqual(queue.started_job_registry, StartedJobRegistry(queue=queue))
self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue))
self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue))
self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue))
self.assertEqual(queue.canceled_job_registry, CanceledJobRegistry(queue=queue))
def test_getting_registries_with_serializer(self):
"""Getting job registries from queue object (with custom serializer)"""
queue = Queue('example', connection=self.connection, serializer=JSONSerializer)
self.assertEqual(queue.scheduled_job_registry, ScheduledJobRegistry(queue=queue))
self.assertEqual(queue.started_job_registry, StartedJobRegistry(queue=queue))
self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue))
self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue))
self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue))
self.assertEqual(queue.canceled_job_registry, CanceledJobRegistry(queue=queue))
# Make sure we don't use default when queue has custom
self.assertEqual(queue.scheduled_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.started_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.failed_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.deferred_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.finished_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.canceled_job_registry.serializer, JSONSerializer)
def test_enqueue_with_retry(self):
"""Enqueueing with retry_strategy works"""
queue = Queue('example', connection=self.connection)
job = queue.enqueue(say_hello, retry=Retry(max=3, interval=5))
job = Job.fetch(job.id, connection=self.connection)
self.assertEqual(job.retries_left, 3)
self.assertEqual(job.retry_intervals, [5])
class TestJobScheduling(RQTestCase):
def test_enqueue_at(self):
"""enqueue_at() creates a job in ScheduledJobRegistry"""
queue = Queue(connection=self.connection)
scheduled_time = datetime.now(timezone.utc) + timedelta(seconds=10)
job = queue.enqueue_at(scheduled_time, say_hello)
registry = ScheduledJobRegistry(queue=queue)
self.assertIn(job, registry)
self.assertEqual(registry.get_expiration_time(job), scheduled_time.replace(microsecond=0))
|