File: simplequeue.py

package info (click to toggle)
python-bumps 1.0.3-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,200 kB
  • sloc: python: 24,517; xml: 493; ansic: 373; makefile: 211; javascript: 99; sh: 94
file content (94 lines) | stat: -rw-r--r-- 3,001 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import threading
from multiprocessing import Process

from . import runjob, jobid, store


class Scheduler(object):
    def __init__(self):
        self._lock = threading.Lock()
        self._nextjob = threading.Event()
        self._jobs = []
        self._pending = []
        self._info = {}
        self._status = {}
        self._results = {}
        self._jobmonitor = threading.Thread(target=self._run_queue)
        self._jobmonitor.start()
        self._current_id = None

    def _run_queue(self):
        while True:
            self._nextjob.wait()
            with self._lock:
                if not self._pending:
                    self._nextjob.clear()
                    continue
                self._current_id = self._pending.pop(0)
                self._status[self._current_id] = "ACTIVE"
                request = self._info[self._current_id]
                self._stopping = None
                self._current_process = Process(target=runjob.run, args=(self._current_id, request))
            self._current_process.start()
            self._current_process.join()
            results = runjob.results(self._current_id)
            with self._lock:
                self._results[self._current_id] = results
                self._status[self._current_id] = results["status"]

    def jobs(self, status=None):
        with self._lock:
            if status is None:
                response = self._jobs[:]
            else:
                response = [j for j in self._jobs if self._status[j] == status]
        return response

    def submit(self, request, origin):
        with self._lock:
            id = int(jobid.get_jobid())
            store.create(id)
            store.put(id, "request", request)
            request["id"] = id
            self._jobs.append(id)
            self._info[id] = request
            self._status[id] = "PENDING"
            self._results[id] = {"status": "PENDING"}
            self._pending.append(id)
            self._nextjob.set()
        return id

    def results(self, id):
        with self._lock:
            return self._results.get(id, {"status": "UNKNOWN"})

    def status(self, id):
        with self._lock:
            return self._status.get(id, "UNKNOWN")

    def info(self, id):
        with self._lock:
            return self._info[id]

    def cancel(self, id):
        with self._lock:
            try:
                self._pending.remove(id)
            except ValueError:
                pass
            if self._current_id == id and not self._stopping == id:
                self._stopping = id
                self._current_process.terminate()
            self._status[id] = "CANCEL"

    def delete(self, id):
        self.cancel(id)
        with self._lock:
            try:
                self._jobs.remove(id)
            except ValueError:
                pass
            self._info.pop(id, None)
            self._results.pop(id, None)
            self._status.pop(id, None)
        store.destroy(id)