1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
|
from datetime import timedelta
from rq import Queue, Worker
from rq.job import Job
from rq.registry import ScheduledJobRegistry
from rq.repeat import Repeat
from rq.utils import now
from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello
class TestRepeat(RQTestCase):
"""Tests for the Repeat class"""
def test_repeat_class_initialization(self):
"""Repeat correctly parses `times` and `interval` parameters"""
# Test with single interval value
repeat = Repeat(times=3, interval=5)
self.assertEqual(repeat.times, 3)
self.assertEqual(repeat.intervals, [5])
# Test with list of intervals
repeat = Repeat(times=4, interval=[5, 10, 15])
self.assertEqual(repeat.times, 4)
self.assertEqual(repeat.intervals, [5, 10, 15])
# Test validation errors
# times must be at least 1
self.assertRaises(ValueError, Repeat, times=0, interval=5)
self.assertRaises(ValueError, Repeat, times=-1, interval=5)
# interval can't be negative
self.assertRaises(ValueError, Repeat, times=1, interval=-5)
self.assertRaises(ValueError, Repeat, times=3, interval=[5, -10])
# interval must be int or iterable
self.assertRaises(TypeError, Repeat, times=3, interval='not_a_number')
def test_get_interval(self):
"""get_interval() returns the right repeat interval"""
# Test with intervals list shorter than needed
intervals = [5, 10, 15]
# First interval
self.assertEqual(Repeat.get_interval(0, intervals), 5)
# Second interval
self.assertEqual(Repeat.get_interval(1, intervals), 10)
# Third interval
self.assertEqual(Repeat.get_interval(2, intervals), 15)
# Beyond the list length, should return the last interval
self.assertEqual(Repeat.get_interval(3, intervals), 15)
# Test with single interval
intervals = [7]
self.assertEqual(Repeat.get_interval(0, intervals), 7)
self.assertEqual(Repeat.get_interval(1, intervals), 7)
# Test with empty intervals list (edge case that shouldn't happen in practice)
# This would cause an IndexError in get_interval
with self.assertRaises(IndexError):
Repeat.get_interval(0, [])
def test_persistence_of_repeat_data(self):
"""Repeat related data is stored and restored properly"""
# Create a job with repeat settings
job = Job.create(func=say_hello, connection=self.connection)
job.repeats_left = 3
job.repeat_intervals = [5, 10, 15]
job.save()
# Clear the job's local state and reload from Redis
job.repeats_left = None
job.repeat_intervals = None
job.refresh()
# Verify the repeat settings were restored correctly
self.assertEqual(job.repeats_left, 3)
self.assertEqual(job.repeat_intervals, [5, 10, 15])
class TestRepeatEnqueue(RQTestCase):
"""Test that Repeat objects are correctly handled when enqueuing jobs"""
def setUp(self):
super().setUp()
self.queue = Queue(connection=self.connection)
def test_enqueue_with_repeat(self):
"""Test that repeat parameters are stored when enqueuing a job with Repeat object"""
repeat = Repeat(times=3, interval=[5, 10, 15])
# Enqueue a job with a Repeat object
job = self.queue.enqueue(say_hello, repeat=repeat)
# Verify the job has the repeat attributes set correctly
self.assertEqual(job.repeats_left, 3)
self.assertEqual(job.repeat_intervals, [5, 10, 15])
# Verify the job is persisted with the repeat attributes
job_id = job.id
loaded_job = Job.fetch(job_id, connection=self.connection)
self.assertEqual(loaded_job.repeats_left, 3)
self.assertEqual(loaded_job.repeat_intervals, [5, 10, 15])
def test_enqueue_at_with_repeat(self):
"""Test that repeat parameters are stored when using enqueue_at with Repeat object"""
repeat = Repeat(times=2, interval=[60, 120])
# Use enqueue_at method
job = self.queue.enqueue_at(now(), say_hello, repeat=repeat)
# Verify the job has the repeat attributes set correctly
self.assertEqual(job.repeats_left, 2)
self.assertEqual(job.repeat_intervals, [60, 120])
def test_enqueue_many_with_repeat(self):
"""Test that repeat parameters are stored when using enqueue_many with Repeat objects"""
# Create different Repeat objects
# Prepare job data with repeat parameters
job_data1 = self.queue.prepare_data(func=say_hello, repeat=Repeat(times=3, interval=10))
job_data2 = self.queue.prepare_data(func=say_hello, repeat=Repeat(times=2, interval=[30, 60]))
# Enqueue multiple jobs
job_1, job_2 = self.queue.enqueue_many([job_data1, job_data2])
job_1.refresh()
self.assertEqual(job_1.repeats_left, 3)
self.assertEqual(job_1.repeat_intervals, [10])
self.assertEqual(job_2.repeats_left, 2)
self.assertEqual(job_2.repeat_intervals, [30, 60])
def test_repeat_schedule_interval_zero(self):
"""Test the Repeat.schedule method properly schedules job repeats"""
# Test 1: Job with zero interval should be executed immediately
repeat = Repeat(times=2, interval=0)
job = self.queue.enqueue(say_hello, repeat=repeat)
Repeat.schedule(job, self.queue)
# Job was enqueued immediately since interval is 0
self.assertIn(job.id, self.queue.get_job_ids())
job.refresh()
self.assertEqual(job.repeats_left, 1)
Repeat.schedule(job, self.queue)
self.assertEqual(job.repeats_left, 0)
# Job can only be repeated twice
with self.assertRaises(ValueError):
Repeat.schedule(job, self.queue)
def test_repeat_schedule_interval_greater_than_zero(self):
"""Test the Repeat.schedule method properly schedules job repeats"""
queue = self.queue
registry = ScheduledJobRegistry(queue=queue)
repeat = Repeat(times=3, interval=30) # 30 second interval
job = queue.enqueue(say_hello, repeat=repeat)
# Clear the queue so we can verify the job is not enqueued immediately
queue.empty()
# Get current time for reference
before_schedule = now()
# Schedule the job
Repeat.schedule(job, queue)
after_schedule = now()
# Verify job was not enqueued immediately
self.assertNotIn(job.id, queue.get_job_ids())
# Verify job was added to scheduled registry
self.assertIn(job.id, registry.get_job_ids())
# Scheduled time should be approximately 30 seconds from now
scheduled_time = registry.get_scheduled_time(job.id)
expected_min = before_schedule + timedelta(seconds=25) # Allow 1 sec buffer
expected_max = after_schedule + timedelta(seconds=35) # Allow 1 sec buffer
self.assertTrue(
expected_min <= scheduled_time <= expected_max,
f'Job not scheduled in expected window: {expected_min} <= {scheduled_time} <= {expected_max}',
)
# Check repeats_left was decremented
job.refresh()
self.assertEqual(job.repeats_left, 2)
class TestWorkerRepeat(RQTestCase):
def test_successful_job_repeat(self):
"""Test that successful jobs are repeated according to Repeat settings"""
queue = Queue(connection=self.connection)
job = queue.enqueue(say_hello, repeat=Repeat(times=1))
worker = Worker([queue], connection=self.connection)
worker.work(burst=True, max_jobs=1)
# The original job should have been processed and repeated
self.assertIn(job.id, queue.get_job_ids())
worker = Worker([queue], connection=self.connection)
worker.work(burst=True, max_jobs=1)
# No repeats left
self.assertNotIn(job.id, queue.get_job_ids())
# Failed jobs don't trigger repeats
job = queue.enqueue(div_by_zero, repeat=Repeat(times=1))
self.assertEqual(job.repeats_left, 1)
worker.work(burst=True)
# Job shouldn't be repeated
self.assertNotIn(job.id, queue.get_job_ids())
|