1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
import time
import json
from . import rest
json_content = 'application/json'
class Connection(object):
def __init__(self, url):
self.rest = rest.Connection(url)
def jobs(self, status=None):
"""
List jobs on the server according to status.
"""
if status is None:
response = self.rest.get('/jobs.json')
else:
response = self.rest.get('/jobs/%s.json'%status.lower())
return _process_response(response)['jobs']
def submit(self, job):
"""
Submit a job to the server.
"""
body = json.dumps(job)
response = self.rest.post('/jobs.json',
mimetype=json_content,
body=body)
return _process_response(response)
def info(self, id):
"""
Return the job structure associated with id.
Raises ValueError if job not found.
Raises IOError if communication error.
"""
response = self.rest.get('/jobs/%s.json'%id)
return _process_response(response)
def status(self, id):
"""
Return the job structure associated with id.
Raises ValueError if job not found.
Raises IOError if communication error.
"""
response = self.rest.get('/jobs/%s/status.json'%id)
return _process_response(response)
def output(self, id):
"""
Return the result from processing the job.
Raises ValueError if job not found.
Raises IOError if communication error.
Check response['status'] for 'COMPLETE','CANCEL','ERROR', etc.
"""
response = self.rest.get('/jobs/%s/results.json'%id)
return _process_response(response)
def wait(self, id, pollrate=300, timeout=60*60*24):
"""
Wait for job to complete, returning output.
*pollrate* is the number of seconds to sleep between checks
*timeout* is the maximum number of seconds to wait
Raises IOError if the timeout is exceeded.
Raises ValueError if job not found.
Raises IOError if communication error.
"""
start = time.clock()
while True:
results = self.output(id)
#print "waiting: result is",results
if results['status'] in ('PENDING', 'ACTIVE'):
#print "waiting for job %s"%id
if time.clock() - start > timeout:
raise IOError('job %s is still pending'%id)
time.sleep(pollrate)
else:
#print "status for %s is"%id,results['status'],'- wait complete'
return results
def stop(self, id):
"""
Stop the job.
Raises ValueError if job not found.
Raises IOError if communication error.
"""
response = self.rest.post('/jobs/%s?action=stop'%id)
return _process_response(response)
def delete(self, id):
"""
Delete the job and all associated files.
Raises ValueError if job not found.
Raises IOError if communication error.
"""
response = self.rest.delete('/jobs/%s.json'%id)
return _process_response(response)
def nextjob(self, queue):
"""
Fetch the next job to process from the queue.
"""
# TODO: combine status check and prefetch to reduce traffic
# TODO: worker sends active and pending jobs so we can load balance
body = json.dumps({'queue': queue})
response = self.rest.post('/jobs/nextjob.json',
mimetype=json_content,
body=body)
return _process_response(response)
def postjob(self, queue, id, results, files):
"""
Return results from a processed job.
"""
# TODO: sign request
fields = {'queue': queue, 'results': json.dumps(results)}
response = self.rest.postfiles('/jobs/%s/postjob'%id,
files=files,
fields=fields)
return _process_response(response)
def putfiles(self, id, files):
# TODO: sign request
response = self.rest.putfiles('/jobs/%s/data/'%id,
files=files)
return _process_response(response)
def _process_response(response):
headers, body = response
#print "response",response[body]
if headers['status'] == '200':
return json.loads(body)
else:
err = headers['status']
msg = rest.RESPONSE.get(err,("Unknown","Unknown code"))[1]
raise IOError("server response %s %s"%(err,msg))
def connect(url):
return Connection(url)
|