1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
try:
from _thread import start_new_thread
except:
from thread import start_new_thread
import os, sys
import logging
import traceback
import time
from multiprocessing import Process
from jobqueue import runjob, store
from jobqueue.client import connect
store.ROOT = "/tmp/worker/%s"
DEFAULT_DISPATCHER = "http://reflectometry.org/queue"
POLLRATE = 10
def log_errors(f):
def wrapped(*args, **kw):
try:
return f(*args, **kw)
except:
exc_type, exc_value, exc_trace = sys.exc_info()
trace = traceback.format_tb(exc_trace)
message = traceback.format_exception_only(exc_type, exc_value)
logging.error(message + trace)
return wrapped
def wait_for_result(remote, id, process, queue):
"""
Wait for job processing to finish. Meanwhile, prefetch the next
request.
"""
next_request = {"request": None}
canceling = False
while True:
# Check if process is complete
process.join(POLLRATE)
if not process.is_alive():
break
# Check that the job is still active, and that it hasn't been
# canceled, or results reported back from a second worker.
# If remote server is down, assume the job is still active.
try:
response = remote.status(id)
except:
response = None
if response and response["status"] != "ACTIVE":
# print "canceling process"
process.terminate()
canceling = True
break
# Prefetch the next job; this strategy works well if there is
# only one worker. If there are many, we may want to leave it
# for another worker to process.
if not next_request["request"]:
# Ignore remote server down errors
try:
next_request = remote.nextjob(queue=queue)
except:
pass
# Grab results from the store
try:
results = runjob.results(id)
except KeyError:
if canceling:
results = {"status": "CANCEL", "message": "Job canceled"}
else:
results = {"status": "ERROR", "message": "Results not found"}
# print "returning results",results
return results, next_request
@log_errors
def update_remote(dispatcher, id, queue, results):
"""
Update remote server with results.
"""
# print "updating remote"
path = store.path(id)
# Remove results key, if it is there
try:
store.delete(id, "results")
except KeyError:
pass
files = [os.path.join(path, f) for f in os.listdir(path)]
# print "sending results",results
# This is done with a separate connection to the server so that it can
# run inside a thread. That way the server can start the next job
# while the megabytes of results are being transfered in the background.
private_remote = connect(dispatcher)
private_remote.postjob(id=id, results=results, queue=queue, files=files)
# Clean up files
for f in files:
os.unlink(f)
os.rmdir(path)
def serve(dispatcher, queue):
"""
Run the work server.
"""
assert queue is not None
next_request = {"request": None}
remote = connect(dispatcher)
while True:
if not next_request["request"]:
try:
next_request = remote.nextjob(queue=queue)
except:
logging.error(traceback.format_exc())
if next_request["request"]:
jobid = next_request["id"]
if jobid is None:
logging.error("request has no job id")
next_request = {"request": None}
continue
logging.info("processing job %s" % jobid)
process = Process(target=runjob.run, args=(jobid, next_request["request"]))
process.start()
results, next_request = wait_for_result(remote, jobid, process, queue)
start_new_thread(update_remote, (dispatcher, jobid, queue, results))
else:
time.sleep(POLLRATE)
def main():
try:
os.nice(19)
except:
pass
if len(sys.argv) <= 1:
print("Requires queue name")
queue = sys.argv[1]
dispatcher = sys.argv[2] if len(sys.argv) > 2 else DEFAULT_DISPATCHER
serve(queue=queue, dispatcher=dispatcher)
if __name__ == "__main__":
main()
|