1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
import json
import os
import signal
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from redis import Redis
from .worker import BaseWorker
from rq.exceptions import InvalidJobOperation
from rq.job import Job
PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s'
def send_command(connection: 'Redis', worker_name: str, command: str, **kwargs):
"""
Sends a command to a worker.
A command is just a string, available commands are:
- `shutdown`: Shuts down a worker
- `kill-horse`: Command for the worker to kill the current working horse
- `stop-job`: A command for the worker to stop the currently running job
The command string will be parsed into a dictionary and send to a PubSub Topic.
Workers listen to the PubSub, and `handle` the specific command.
Args:
connection (Redis): A Redis Connection
worker_name (str): The Job ID
"""
payload = {'command': command}
if kwargs:
payload.update(kwargs)
connection.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload))
def parse_payload(payload: dict[Any, Any]) -> dict[Any, Any]:
"""
Returns a dict of command data
Args:
payload (dict): Parses the payload dict.
"""
return json.loads(payload['data'].decode())
def send_shutdown_command(connection: 'Redis', worker_name: str):
"""
Sends a command to shutdown a worker.
Args:
connection (Redis): A Redis Connection
worker_name (str): The Job ID
"""
send_command(connection, worker_name, 'shutdown')
def send_kill_horse_command(connection: 'Redis', worker_name: str):
"""
Tell worker to kill it's horse
Args:
connection (Redis): A Redis Connection
worker_name (str): The Job ID
"""
send_command(connection, worker_name, 'kill-horse')
def send_stop_job_command(connection: 'Redis', job_id: str, serializer=None):
"""
Instruct a worker to stop a job
Args:
connection (Redis): A Redis Connection
job_id (str): The Job ID
serializer (): The serializer
"""
job = Job.fetch(job_id, connection=connection, serializer=serializer)
if not job.worker_name:
raise InvalidJobOperation('Job is not currently executing')
send_command(connection, job.worker_name, 'stop-job', job_id=job_id)
def handle_command(worker: 'BaseWorker', payload: dict[Any, Any]):
"""Parses payload and routes commands to the worker.
Args:
worker (Worker): The worker to use
payload (Dict[Any, Any]): The Payload
"""
if payload['command'] == 'stop-job':
handle_stop_job_command(worker, payload)
elif payload['command'] == 'shutdown':
handle_shutdown_command(worker)
elif payload['command'] == 'kill-horse':
handle_kill_worker_command(worker, payload)
def handle_shutdown_command(worker: 'BaseWorker'):
"""Perform shutdown command.
Args:
worker (Worker): The worker to use.
"""
worker.log.info('Received shutdown command, sending SIGINT signal.')
pid = os.getpid()
os.kill(pid, signal.SIGINT)
def handle_kill_worker_command(worker: 'BaseWorker', payload: dict[Any, Any]):
"""
Stops work horse
Args:
worker (Worker): The worker to stop
payload (Dict[Any, Any]): The payload.
"""
worker.log.info('Received kill horse command.')
if worker.horse_pid:
worker.log.info('Killing horse...')
worker.kill_horse()
else:
worker.log.info('Worker is not working, kill horse command ignored')
def handle_stop_job_command(worker: 'BaseWorker', payload: dict[Any, Any]):
"""Handles stop job command.
Args:
worker (Worker): The worker to use
payload (Dict[Any, Any]): The payload.
"""
job_id = payload.get('job_id')
worker.log.debug('Received command to stop job %s', job_id)
if job_id and worker.get_current_job_id() == job_id:
# Sets the '_stopped_job_id' so that the job failure handler knows it
# was intentional.
worker._stopped_job_id = job_id
worker.kill_horse()
else:
worker.log.info('Not working on job %s, command ignored.', job_id)
|