1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
|
from multiprocessing import Process
from rq import Queue, SimpleWorker, Worker
from rq.job import Dependency, Job, JobStatus
from rq.utils import current_timestamp
from tests import RQTestCase
from tests.fixtures import check_dependencies_are_met, div_by_zero, kill_horse, long_running_job, say_hello
class TestDependencies(RQTestCase):
def test_allow_failure_is_persisted(self):
"""Ensure that job.allow_dependency_failures is properly set
when providing Dependency object to depends_on."""
dep_job = Job.create(func=say_hello, connection=self.connection)
# default to False, maintaining current behavior
job = Job.create(func=say_hello, connection=self.connection, depends_on=Dependency([dep_job]))
job.save()
Job.fetch(job.id, connection=self.connection)
self.assertFalse(job.allow_dependency_failures)
job = Job.create(
func=say_hello, connection=self.connection, depends_on=Dependency([dep_job], allow_failure=True)
)
job.save()
job = Job.fetch(job.id, connection=self.connection)
self.assertTrue(job.allow_dependency_failures)
jobs = Job.fetch_many([job.id], connection=self.connection)
self.assertTrue(jobs[0].allow_dependency_failures)
def test_deferred_task_not_enqueued_when_dependencies_are_not_finished(self):
job_a = Job.create(say_hello, connection=self.connection)
job_b = Job.create(say_hello, connection=self.connection)
job_c = Job.create(say_hello, connection=self.connection, depends_on=[job_a, job_b])
job_a.save()
job_b.save()
job_c.save()
queue = Queue('default', connection=self.connection)
queue.enqueue_job(job_c)
self.assertEqual(JobStatus.DEFERRED, job_c.get_status())
self.assertEqual(0, queue.count)
queue.enqueue_job(job_a)
worker = SimpleWorker([queue], connection=queue.connection)
worker.work(burst=True)
self.assertEqual(JobStatus.FINISHED, job_a.get_status())
# Child job is started but should not!!!
self.assertEqual(JobStatus.DEFERRED, job_c.get_status(refresh=True))
def test_job_dependency(self):
"""Enqueue dependent jobs only when appropriate"""
q = Queue(connection=self.connection)
w = SimpleWorker([q], connection=q.connection)
# enqueue dependent job when parent successfully finishes
parent_job = q.enqueue(say_hello)
job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True)
job = Job.fetch(job.id, connection=self.connection)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
q.empty()
# don't enqueue dependent job when parent fails
parent_job = q.enqueue(div_by_zero)
job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True)
job = Job.fetch(job.id, connection=self.connection)
self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
q.empty()
# don't enqueue dependent job when Dependency.allow_failure=False (the default)
parent_job = q.enqueue(div_by_zero)
dependency = Dependency(jobs=parent_job)
job = q.enqueue_call(say_hello, depends_on=dependency)
w.work(burst=True)
job = Job.fetch(job.id, connection=self.connection)
self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
# enqueue dependent job when Dependency.allow_failure=True
parent_job = q.enqueue(div_by_zero)
dependency = Dependency(jobs=parent_job, allow_failure=True)
job = q.enqueue_call(say_hello, depends_on=dependency)
job = Job.fetch(job.id, connection=self.connection)
self.assertTrue(job.allow_dependency_failures)
w.work(burst=True)
job = Job.fetch(job.id, connection=self.connection)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
# When a failing job has multiple dependents, only enqueue those
# with allow_failure=True
parent_job = q.enqueue(div_by_zero)
job_allow_failure = q.enqueue(say_hello, depends_on=Dependency(jobs=parent_job, allow_failure=True))
job = q.enqueue(say_hello, depends_on=Dependency(jobs=parent_job, allow_failure=False))
w.work(burst=True, max_jobs=1)
self.assertEqual(parent_job.get_status(), JobStatus.FAILED)
self.assertEqual(job_allow_failure.get_status(), JobStatus.QUEUED)
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
q.empty()
# only enqueue dependent job when all dependencies have finished/failed
first_parent_job = q.enqueue(div_by_zero)
second_parent_job = q.enqueue(say_hello)
dependencies = Dependency(jobs=[first_parent_job, second_parent_job], allow_failure=True)
job = q.enqueue_call(say_hello, depends_on=dependencies)
w.work(burst=True, max_jobs=1)
self.assertEqual(first_parent_job.get_status(), JobStatus.FAILED)
self.assertEqual(second_parent_job.get_status(), JobStatus.QUEUED)
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
# When second job finishes, dependent job should be queued
w.work(burst=True, max_jobs=1)
self.assertEqual(second_parent_job.get_status(), JobStatus.FINISHED)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
w.work(burst=True)
job = Job.fetch(job.id, connection=self.connection)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
# Test dependant is enqueued at front
q.empty()
parent_job = q.enqueue(say_hello)
q.enqueue(say_hello, job_id='fake_job_id_1', depends_on=Dependency(jobs=[parent_job]))
q.enqueue(say_hello, job_id='fake_job_id_2', depends_on=Dependency(jobs=[parent_job], enqueue_at_front=True))
w.work(burst=True, max_jobs=1)
self.assertEqual(q.job_ids, ['fake_job_id_2', 'fake_job_id_1'])
def test_multiple_jobs_with_dependencies(self):
"""Enqueue dependent jobs only when appropriate"""
q = Queue(connection=self.connection)
w = SimpleWorker([q], connection=q.connection)
# Multiple jobs are enqueued with correct status
parent_job = q.enqueue(say_hello)
job_no_deps = Queue.prepare_data(say_hello)
job_with_deps = Queue.prepare_data(say_hello, depends_on=parent_job)
jobs = q.enqueue_many([job_no_deps, job_with_deps])
self.assertEqual(jobs[0].get_status(), JobStatus.QUEUED)
self.assertEqual(jobs[1].get_status(), JobStatus.DEFERRED)
w.work(burst=True, max_jobs=1)
self.assertEqual(jobs[1].get_status(), JobStatus.QUEUED)
job_with_met_deps = Queue.prepare_data(say_hello, depends_on=parent_job)
jobs = q.enqueue_many([job_with_met_deps])
self.assertEqual(jobs[0].get_status(), JobStatus.QUEUED)
q.empty()
def test_dependency_list_in_depends_on(self):
"""Enqueue with Dependency list in depends_on"""
q = Queue(connection=self.connection)
w = SimpleWorker([q], connection=q.connection)
# enqueue dependent job when parent successfully finishes
parent_job1 = q.enqueue(say_hello)
parent_job2 = q.enqueue(say_hello)
job = q.enqueue_call(say_hello, depends_on=[Dependency([parent_job1]), Dependency([parent_job2])])
w.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
def test_enqueue_job_dependency(self):
"""Enqueue via Queue.enqueue_job() with depencency"""
q = Queue(connection=self.connection)
w = SimpleWorker([q], connection=q.connection)
# enqueue dependent job when parent successfully finishes
parent_job = Job.create(say_hello, connection=self.connection)
parent_job.save()
job = Job.create(say_hello, connection=self.connection, depends_on=parent_job)
q.enqueue_job(job)
w.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
q.enqueue_job(parent_job)
w.work(burst=True)
self.assertEqual(parent_job.get_status(), JobStatus.FINISHED)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
def test_enqueue_job_dependency_sets_ttl(self):
"""Ensures that the TTL of jobs in the deferred queue is set"""
q = Queue(connection=self.connection)
parent_job = Job.create(say_hello, connection=self.connection)
parent_job.save()
timestamp = current_timestamp()
ttl = 5
job = Job.create(say_hello, connection=self.connection, depends_on=parent_job, ttl=ttl)
q.enqueue_job(job)
score = self.connection.zscore(q.deferred_job_registry.key, job.id)
self.assertLess(score, timestamp + ttl + 2)
self.assertGreater(score, timestamp + ttl - 2)
def test_dependencies_are_met_if_parent_is_canceled(self):
"""When parent job is canceled, it should be treated as failed"""
queue = Queue(connection=self.connection)
job = queue.enqueue(say_hello)
job.set_status(JobStatus.CANCELED)
dependent_job = queue.enqueue(say_hello, depends_on=job)
# dependencies_are_met() should return False, whether or not
# parent_job is provided
self.assertFalse(dependent_job.dependencies_are_met(job))
self.assertFalse(dependent_job.dependencies_are_met())
def test_can_enqueue_job_if_dependency_is_deleted(self):
queue = Queue(connection=self.connection)
dependency_job = queue.enqueue(say_hello, result_ttl=0)
w = Worker([queue], connection=self.connection)
w.work(burst=True)
assert queue.enqueue(say_hello, depends_on=dependency_job)
def test_dependencies_are_met_if_dependency_is_deleted(self):
queue = Queue(connection=self.connection)
dependency_job = queue.enqueue(say_hello, result_ttl=0)
dependent_job = queue.enqueue(say_hello, depends_on=dependency_job)
w = Worker([queue], connection=self.connection)
w.work(burst=True, max_jobs=1)
assert dependent_job.dependencies_are_met()
assert dependent_job.get_status() == JobStatus.QUEUED
def test_dependencies_are_met_at_execution_time(self):
queue = Queue(connection=self.connection)
queue.empty()
queue.enqueue(say_hello, job_id='A')
queue.enqueue(say_hello, job_id='B')
job_c = queue.enqueue(check_dependencies_are_met, job_id='C', depends_on=['A', 'B'])
job_c.dependencies_are_met()
w = Worker([queue], connection=self.connection)
w.work(burst=True)
assert job_c.result
def test_allow_failures_when_work_horse_killed(self):
"""Ensure that allow_failure is respected when a worker is killed"""
queue = Queue(connection=self.connection)
job = queue.enqueue(long_running_job, 10, horse_pid_key='horse_pid_key')
job2 = queue.enqueue(say_hello, depends_on=Dependency(jobs=job, allow_failure=True))
# Wait 1 second before killing the horse to simulate horse terminating unexpectedly
p = Process(target=kill_horse, args=('horse_pid_key', self.connection.connection_pool.connection_kwargs, 1))
p.start()
worker = Worker([queue], connection=self.connection)
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertEqual(job2.get_status(), JobStatus.FINISHED)
def test_dependency_accepts_single_job(self):
"""Test that Dependency constructor accepts a single Job instance"""
q = Queue(connection=self.connection)
w = SimpleWorker([q], connection=q.connection)
# Test with single Job instance
parent_job = q.enqueue(say_hello)
dependency = Dependency(parent_job) # Single job, not in a list
job = q.enqueue_call(say_hello, depends_on=dependency)
w.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
q.empty()
# Test with single Job instance and allow_failure=True
parent_job = q.enqueue(div_by_zero)
dependency = Dependency(parent_job, allow_failure=True) # Single job with allow_failure
job = q.enqueue_call(say_hello, depends_on=dependency)
w.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
q.empty()
# Test with single job ID string
parent_job = q.enqueue(say_hello)
dependency = Dependency(parent_job.id) # Single job ID string
job = q.enqueue_call(say_hello, depends_on=dependency)
w.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
|