File: worker.py

package info (click to toggle)
python-bumps 1.0.3-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,200 kB
  • sloc: python: 24,517; xml: 493; ansic: 373; makefile: 211; javascript: 99; sh: 94
file content (148 lines) | stat: -rw-r--r-- 4,420 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
try:
    from _thread import start_new_thread
except:
    from thread import start_new_thread

import os, sys
import logging
import traceback
import time
from multiprocessing import Process

from jobqueue import runjob, store
from jobqueue.client import connect

store.ROOT = "/tmp/worker/%s"
DEFAULT_DISPATCHER = "http://reflectometry.org/queue"
POLLRATE = 10


def log_errors(f):
    def wrapped(*args, **kw):
        try:
            return f(*args, **kw)
        except:
            exc_type, exc_value, exc_trace = sys.exc_info()
            trace = traceback.format_tb(exc_trace)
            message = traceback.format_exception_only(exc_type, exc_value)
            logging.error(message + trace)

    return wrapped


def wait_for_result(remote, id, process, queue):
    """
    Wait for job processing to finish.  Meanwhile, prefetch the next
    request.
    """
    next_request = {"request": None}
    canceling = False
    while True:
        # Check if process is complete
        process.join(POLLRATE)
        if not process.is_alive():
            break

        # Check that the job is still active, and that it hasn't been
        # canceled, or results reported back from a second worker.
        # If remote server is down, assume the job is still active.
        try:
            response = remote.status(id)
        except:
            response = None
        if response and response["status"] != "ACTIVE":
            # print "canceling process"
            process.terminate()
            canceling = True
            break

        # Prefetch the next job; this strategy works well if there is
        # only one worker.  If there are many, we may want to leave it
        # for another worker to process.
        if not next_request["request"]:
            # Ignore remote server down errors
            try:
                next_request = remote.nextjob(queue=queue)
            except:
                pass

    # Grab results from the store
    try:
        results = runjob.results(id)
    except KeyError:
        if canceling:
            results = {"status": "CANCEL", "message": "Job canceled"}
        else:
            results = {"status": "ERROR", "message": "Results not found"}

    # print "returning results",results
    return results, next_request


@log_errors
def update_remote(dispatcher, id, queue, results):
    """
    Update remote server with results.
    """
    # print "updating remote"
    path = store.path(id)
    # Remove results key, if it is there
    try:
        store.delete(id, "results")
    except KeyError:
        pass
    files = [os.path.join(path, f) for f in os.listdir(path)]
    # print "sending results",results
    # This is done with a separate connection to the server so that it can
    # run inside a thread.  That way the server can start the next job
    # while the megabytes of results are being transfered in the background.
    private_remote = connect(dispatcher)
    private_remote.postjob(id=id, results=results, queue=queue, files=files)
    # Clean up files
    for f in files:
        os.unlink(f)
    os.rmdir(path)


def serve(dispatcher, queue):
    """
    Run the work server.
    """
    assert queue is not None
    next_request = {"request": None}
    remote = connect(dispatcher)
    while True:
        if not next_request["request"]:
            try:
                next_request = remote.nextjob(queue=queue)
            except:
                logging.error(traceback.format_exc())
        if next_request["request"]:
            jobid = next_request["id"]
            if jobid is None:
                logging.error("request has no job id")
                next_request = {"request": None}
                continue
            logging.info("processing job %s" % jobid)
            process = Process(target=runjob.run, args=(jobid, next_request["request"]))
            process.start()
            results, next_request = wait_for_result(remote, jobid, process, queue)
            start_new_thread(update_remote, (dispatcher, jobid, queue, results))
        else:
            time.sleep(POLLRATE)


def main():
    try:
        os.nice(19)
    except:
        pass
    if len(sys.argv) <= 1:
        print("Requires queue name")
    queue = sys.argv[1]
    dispatcher = sys.argv[2] if len(sys.argv) > 2 else DEFAULT_DISPATCHER
    serve(queue=queue, dispatcher=dispatcher)


if __name__ == "__main__":
    main()