1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
|
import json
import os
import signal
from rq.exceptions import InvalidJobOperation
from rq.job import Job
PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s'
def send_command(connection, worker_name, command, **kwargs):
"""Use connection' pubsub mechanism to send a command"""
payload = {'command': command}
if kwargs:
payload.update(kwargs)
connection.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload))
def parse_payload(payload):
"""Returns a dict of command data"""
return json.loads(payload.get('data').decode())
def send_shutdown_command(connection, worker_name):
"""Send shutdown command"""
send_command(connection, worker_name, 'shutdown')
def send_kill_horse_command(connection, worker_name):
"""Tell worker to kill it's horse"""
send_command(connection, worker_name, 'kill-horse')
def send_stop_job_command(connection, job_id):
"""Instruct a worker to stop a job"""
job = Job.fetch(job_id, connection=connection)
if not job.worker_name:
raise InvalidJobOperation('Job is not currently executing')
send_command(connection, job.worker_name, 'stop-job', job_id=job_id)
def handle_command(worker, payload):
"""Parses payload and routes commands"""
if payload['command'] == 'stop-job':
handle_stop_job_command(worker, payload)
elif payload['command'] == 'shutdown':
handle_shutdown_command(worker)
elif payload['command'] == 'kill-horse':
handle_kill_worker_command(worker, payload)
def handle_shutdown_command(worker):
"""Perform shutdown command"""
worker.log.info('Received shutdown command, sending SIGINT signal.')
pid = os.getpid()
os.kill(pid, signal.SIGINT)
def handle_kill_worker_command(worker, payload):
"""Stops work horse"""
worker.log.info('Received kill horse command.')
if worker.horse_pid:
worker.log.info('Kiling horse...')
worker.kill_horse()
else:
worker.log.info('Worker is not working, kill horse command ignored')
def handle_stop_job_command(worker, payload):
"""Handles stop job command"""
job_id = payload.get('job_id')
worker.log.debug('Received command to stop job %s', job_id)
if job_id and worker.get_current_job_id() == job_id:
worker.kill_horse()
else:
worker.log.info('Not working on job %s, command ignored.', job_id)
|