1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
from collections.abc import Iterable
from redis.client import Pipeline
from redis.exceptions import WatchError
from .job import Job
class Dependency:
@classmethod
def get_jobs_with_met_dependencies(cls, jobs: Iterable['Job'], pipeline: Pipeline):
jobs_with_met_dependencies = []
jobs_with_unmet_dependencies = []
for job in jobs:
while True:
try:
pipeline.watch(*[Job.key_for(dependency_id) for dependency_id in job._dependency_ids])
job.register_dependency(pipeline=pipeline)
if job.dependencies_are_met(pipeline=pipeline):
jobs_with_met_dependencies.append(job)
else:
jobs_with_unmet_dependencies.append(job)
pipeline.execute()
except WatchError:
continue
break
return jobs_with_met_dependencies, jobs_with_unmet_dependencies
|