1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
|
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Optional
from uuid import uuid4
from redis import Redis
if TYPE_CHECKING:
from redis.client import Pipeline
from .job import Job
from .registry import BaseRegistry, StartedJobRegistry
from .utils import as_text, current_timestamp, now, parse_composite_key
# TODO: add execution.worker
class Execution:
"""Class to represent an execution of a job."""
def __init__(self, id: str, job_id: str, connection: Redis):
self.id = id
self.job_id = job_id
self.connection = connection
right_now = now()
self.created_at = right_now
self.last_heartbeat = right_now
self._job: Optional[Job] = None
def __eq__(self, other: object) -> bool:
if not isinstance(other, Execution):
return False
return self.id == other.id
@property
def key(self) -> str:
return f'rq:execution:{self.composite_key}'
@property
def job(self) -> Job:
if self._job:
return self._job
self._job = Job.fetch(id=self.job_id, connection=self.connection)
return self._job
@property
def composite_key(self):
return f'{self.job_id}:{self.id}'
@classmethod
def fetch(cls, id: str, job_id: str, connection: Redis) -> 'Execution':
"""Fetch an execution from Redis."""
execution = cls(id=id, job_id=job_id, connection=connection)
execution.refresh()
return execution
def refresh(self):
"""Refresh execution data from Redis."""
data = self.connection.hgetall(self.key)
if not data:
raise ValueError(f'Execution {self.id} not found in Redis')
self.created_at = datetime.fromtimestamp(float(data[b'created_at']), tz=timezone.utc)
self.last_heartbeat = datetime.fromtimestamp(float(data[b'last_heartbeat']), tz=timezone.utc)
@classmethod
def from_composite_key(cls, composite_key: str, connection: Redis) -> 'Execution':
"""A combination of job_id and execution_id separated by a colon."""
job_id, execution_id = parse_composite_key(composite_key)
return cls(id=execution_id, job_id=job_id, connection=connection)
@classmethod
def create(cls, job: Job, ttl: int, pipeline: 'Pipeline') -> 'Execution':
"""Save execution data to Redis."""
id = uuid4().hex
execution = cls(id=id, job_id=job.id, connection=job.connection)
execution.save(ttl=ttl, pipeline=pipeline)
ExecutionRegistry(job_id=job.id, connection=pipeline).add(execution=execution, ttl=ttl, pipeline=pipeline)
job.started_job_registry.add_execution(execution, pipeline=pipeline, ttl=ttl, xx=False)
return execution
def save(self, ttl: int, pipeline: Optional['Pipeline'] = None):
"""Save execution data to Redis and JobExecutionRegistry."""
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, mapping=self.serialize())
# Still unsure how to handle TTL, but this should be tied to heartbeat TTL
connection.expire(self.key, ttl)
def delete(self, job: Job, pipeline: 'Pipeline'):
"""Delete an execution from Redis."""
pipeline.delete(self.key)
job.started_job_registry.remove_execution(execution=self, pipeline=pipeline)
ExecutionRegistry(job_id=self.job_id, connection=self.connection).remove(execution=self, pipeline=pipeline)
def serialize(self) -> dict:
return {
'id': self.id,
'created_at': self.created_at.timestamp(),
'last_heartbeat': self.last_heartbeat.timestamp(),
}
def heartbeat(self, started_job_registry: StartedJobRegistry, ttl: int, pipeline: 'Pipeline'):
"""Update execution heartbeat."""
# TODO: worker heartbeat should be tied to execution heartbeat
self.last_heartbeat = now()
pipeline.hset(self.key, 'last_heartbeat', self.last_heartbeat.timestamp())
pipeline.expire(self.key, ttl)
started_job_registry.add_execution(self, ttl=ttl, pipeline=pipeline, xx=True)
ExecutionRegistry(job_id=self.job_id, connection=pipeline).add(execution=self, ttl=ttl, pipeline=pipeline)
class ExecutionRegistry(BaseRegistry):
"""Class to represent a registry of job executions.
Each job has its own execution registry.
"""
key_template = 'rq:executions:{0}'
def __init__(self, job_id: str, connection: Redis):
self.connection = connection
self.job_id = job_id
self.key = self.key_template.format(job_id)
def cleanup(self, timestamp: Optional[float] = None, exception_handlers: Optional[list] = None):
"""Remove expired jobs from registry.
Removes jobs with an expiry time earlier than timestamp, specified as
seconds since the Unix epoch. timestamp defaults to call time if
unspecified.
"""
score = timestamp if timestamp is not None else current_timestamp()
self.connection.zremrangebyscore(self.key, 0, score)
def add(self, execution: Execution, ttl: int, pipeline: 'Pipeline') -> Any: # type: ignore
"""Register an execution to registry with expiry time of now + ttl, unless it's -1 which is set to +inf
Args:
execution (Execution): The Execution to add
ttl (int, optional): The time to live. Defaults to 0.
pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
Returns:
result (int): The ZADD command result
"""
score = current_timestamp() + ttl
pipeline.zadd(self.key, {execution.id: score + 60})
# Still unsure how to handle registry TTL, but it should be the same as job TTL
pipeline.expire(self.key, ttl + 60)
return
def remove(self, execution: Execution, pipeline: 'Pipeline') -> Any: # type: ignore
"""Remove an execution from registry."""
return pipeline.zrem(self.key, execution.id)
def get_execution_ids(self, start: int = 0, end: int = -1) -> list[str]:
"""Returns all executions IDs in registry"""
self.cleanup()
return [as_text(job_id) for job_id in self.connection.zrange(self.key, start, end)]
def get_executions(self, start: int = 0, end: int = -1) -> list[Execution]:
"""Returns all executions IDs in registry"""
execution_ids = self.get_execution_ids(start, end)
executions = []
# TODO: This operation should be pipelined, preferably using Execution.fetch_many()
for execution_id in execution_ids:
executions.append(Execution.fetch(id=execution_id, job_id=self.job_id, connection=self.connection))
return executions
def delete(self, job: Job, pipeline: 'Pipeline'):
"""Delete the registry."""
executions = self.get_executions()
for execution in executions:
execution.delete(pipeline=pipeline, job=job)
pipeline.delete(self.key)
|