File: simplequeue.py

package info (click to toggle)
python-bumps 0.7.11-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 10,264 kB
  • sloc: python: 22,226; ansic: 4,973; cpp: 4,849; xml: 493; makefile: 163; perl: 108; sh: 101
file content (83 lines) | stat: -rw-r--r-- 2,970 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import threading
from multiprocessing import Process

from . import runjob, jobid, store

class Scheduler(object):
    def __init__(self):
        self._lock = threading.Lock()
        self._nextjob = threading.Event()
        self._jobs = []
        self._pending = []
        self._info = {}
        self._status = {}
        self._results = {}
        self._jobmonitor = threading.Thread(target=self._run_queue)
        self._jobmonitor.start()
        self._current_id = None
    def _run_queue(self):
        while True:
            self._nextjob.wait()
            with self._lock:
                if not self._pending:
                    self._nextjob.clear()
                    continue
                self._current_id = self._pending.pop(0)
                self._status[self._current_id] = 'ACTIVE'
                request = self._info[self._current_id]
                self._stopping = None
                self._current_process = Process(target=runjob.run,
                                                args=(self._current_id,request))
            self._current_process.start()
            self._current_process.join()
            results = runjob.results(self._current_id)
            with self._lock:
                self._results[self._current_id] = results
                self._status[self._current_id] = results['status']

    def jobs(self, status=None):
        with self._lock:
            if status is None:
                response = self._jobs[:]
            else:
                response = [j for j in self._jobs if self._status[j] == status]
        return response
    def submit(self, request, origin):
        with self._lock:
            id = int(jobid.get_jobid())
            store.create(id)
            store.put(id,'request',request)
            request['id'] = id
            self._jobs.append(id)
            self._info[id] = request
            self._status[id] = 'PENDING'
            self._results[id] = {'status':'PENDING'}
            self._pending.append(id)
            self._nextjob.set()
        return id
    def results(self, id):
        with self._lock:
            return self._results.get(id,{'status':'UNKNOWN'})
    def status(self, id):
        with self._lock:
            return self._status.get(id,'UNKNOWN')
    def info(self, id):
        with self._lock:
            return self._info[id]
    def cancel(self, id):
        with self._lock:
            try: self._pending.remove(id)
            except ValueError: pass
            if self._current_id == id and not self._stopping == id:
                self._stopping = id
                self._current_process.terminate()
            self._status[id] = 'CANCEL'
    def delete(self, id):
        self.cancel(id)
        with self._lock:
            try: self._jobs.remove(id)
            except ValueError: pass
            self._info.pop(id, None)
            self._results.pop(id, None)
            self._status.pop(id, None)
        store.destroy(id)