1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
|
import time
import sys
import six.moves.queue
from threading import Lock
from multiprocessing.pool import ThreadPool
_init_lock = Lock()
_pools = {}
class Job(object):
"""A job to be executed by a pool
The job accepts a function and arguments.
When it is run, it will execute the function with the specified arguments.
The return value of the function will be stored in the result property of the job.
If the function raises an exception, it will be stored in the exception property of the job.
"""
__slots__ = (
'func', 'description',
'args', 'kwargs', 'result',
'exception', 'exception_info',
)
def __init__(self, func, description, *args, **kwargs):
self.func = func
self.args = args
self.description = description
self.kwargs = kwargs
self.result = None
self.exception = None
def __str__(self):
return self.description
def run(self):
try:
self.result = self.func(*self.args, **self.kwargs)
except Exception as e:
self.exception_info = sys.exc_info()
self.exception = e
def get_pool(name="default", thread_count=1):
"""Get (and initialize) a Thread pool.
If thread_count is 0, then None is returned.
If the thread pool had already been initialized, thread_count will
be ignored.
"""
if not thread_count:
return None
with _init_lock:
pool = _pools.get(name, None)
if pool is None:
pool = ThreadPool(thread_count)
_pools[name] = pool
return pool
def stop_pools():
with _init_lock:
for name in list(_pools.keys()):
pool = _pools.pop(name)
pool.close()
def stop_pool(name="default"):
with _init_lock:
_pools[name].close()
del _pools[name]
class PoolTimeoutError(Exception):
pass
def pool_exec(pool, jobs, timeout):
"""Execute a list of jobs, yielding each one as it completes.
If a pool is specified then the jobs will be executed asynchronously,
otherwise they are executed in order.
If not all jobs have been executed after the specified timeout a
PoolTimeoutError will be raised. When operating synchronously the
timeout is checked before each job is run.
"""
start = time.time()
deadline = start + timeout
if pool:
queue = six.moves.queue.Queue()
def pool_executor(job):
job.run()
queue.put(job)
for job in jobs:
pool.apply_async(func=pool_executor, args=[job])
done = 0
total = len(jobs)
while done < total:
wait_time = max(0, deadline - time.time())
try:
job = queue.get(True, wait_time)
except six.moves.queue.Empty:
raise PoolTimeoutError("Timed out after %fs" % (time.time() - start))
done += 1
yield job
else:
for job in jobs:
if time.time() > deadline:
raise PoolTimeoutError("Timed out after %fs" % (time.time() - start))
job.run()
yield job
|