1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
import threading
import time
import traceback
class ArloBackgroundWorker(threading.Thread):
def __init__(self, arlo):
super().__init__()
self._arlo = arlo
self._id = 0
self._lock = threading.Condition()
self._queue = {}
self._stopThread = False
def _next_id(self):
self._id += 1
return str(self._id) + ":" + str(time.monotonic())
def _run_next(self):
# timeout in the future
timeout = int(time.monotonic() + 60)
# go by priority...
for prio in sorted(self._queue.keys()):
# jobs in particular priority
for run_at, job_id in sorted(self._queue[prio].keys()):
if run_at <= int(time.monotonic()):
job = self._queue[prio].pop((run_at, job_id))
self._lock.release()
# run it
try:
job["callback"](**job["args"])
except Exception as e:
self._arlo.error(
"job-error={}\n{}".format(
type(e).__name__, traceback.format_exc()
)
)
# reschedule?
self._lock.acquire()
run_every = job.get("run_every", None)
if run_every:
run_at += run_every
self._queue[prio][(run_at, job_id)] = job
# start going through list again
return None
else:
if run_at < timeout:
timeout = run_at
break
return timeout
def run(self):
with self._lock:
while not self._stopThread:
# loop till done
timeout = None
while timeout is None:
timeout = self._run_next()
# wait or get going?
now = time.monotonic()
if now < timeout:
self._lock.wait(timeout - now)
def queue_job(self, run_at, prio, job):
run_at = int(run_at)
with self._lock:
job_id = self._next_id()
if prio not in self._queue:
self._queue[prio] = {}
self._queue[prio][(run_at, job_id)] = job
self._lock.notify()
return job_id
def stop_job(self, to_delete):
with self._lock:
for prio in self._queue.keys():
for run_at, job_id in self._queue[prio].keys():
if job_id == to_delete:
# print( 'cancelling ' + str(job_id) )
del self._queue[prio][(run_at, job_id)]
return True
return False
def stop(self):
with self._lock:
self._stopThread = True
self._lock.notify()
self.join(10)
class ArloBackground:
def __init__(self, arlo):
self._worker = ArloBackgroundWorker(arlo)
self._worker.name = "ArloBackgroundWorker"
self._worker.daemon = True
self._worker.start()
arlo.debug("background: starting")
def _run(self, bg_cb, prio, **kwargs):
job = {"callback": bg_cb, "args": kwargs}
return self._worker.queue_job(time.monotonic(), prio, job)
def run_high(self, bg_cb, **kwargs):
return self._run(bg_cb, 10, **kwargs)
def run(self, bg_cb, **kwargs):
return self._run(bg_cb, 40, **kwargs)
def run_low(self, bg_cb, **kwargs):
return self._run(bg_cb, 99, **kwargs)
def _run_in(self, bg_cb, prio, seconds, **kwargs):
job = {"callback": bg_cb, "args": kwargs}
return self._worker.queue_job(time.monotonic() + seconds, prio, job)
def run_high_in(self, bg_cb, seconds, **kwargs):
return self._run_in(bg_cb, 10, seconds, **kwargs)
def run_in(self, bg_cb, seconds, **kwargs):
return self._run_in(bg_cb, 40, seconds, **kwargs)
def run_low_in(self, bg_cb, seconds, **kwargs):
return self._run_in(bg_cb, 99, seconds, **kwargs)
def _run_every(self, bg_cb, prio, seconds, **kwargs):
job = {"run_every": seconds, "callback": bg_cb, "args": kwargs}
return self._worker.queue_job(time.monotonic() + seconds, prio, job)
def run_high_every(self, bg_cb, seconds, **kwargs):
return self._run_every(bg_cb, 10, seconds, **kwargs)
def run_every(self, bg_cb, seconds, **kwargs):
return self._run_every(bg_cb, 40, seconds, **kwargs)
def run_low_every(self, bg_cb, seconds, **kwargs):
return self._run_every(bg_cb, 99, seconds, **kwargs)
def cancel(self, to_delete):
if to_delete is not None:
self._worker.stop_job(to_delete)
def stop(self):
self._worker.stop()
|