File: pool.py

package info (click to toggle)
graphite-web 1.1.8-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 7,592 kB
  • sloc: javascript: 86,823; python: 11,977; sh: 61; makefile: 50
file content (125 lines) | stat: -rw-r--r-- 3,182 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import time
import sys

import six.moves.queue

from threading import Lock
from multiprocessing.pool import ThreadPool

_init_lock = Lock()
_pools = {}


class Job(object):
    """A job to be executed by a pool

    The job accepts a function and arguments.

    When it is run, it will execute the function with the specified arguments.

    The return value of the function will be stored in the result property of the job.

    If the function raises an exception, it will be stored in the exception property of the job.
    """
    __slots__ = (
        'func', 'description',
        'args', 'kwargs', 'result',
        'exception', 'exception_info',
    )

    def __init__(self, func, description, *args, **kwargs):
        self.func = func
        self.args = args
        self.description = description
        self.kwargs = kwargs
        self.result = None
        self.exception = None

    def __str__(self):
        return self.description

    def run(self):
        try:
            self.result = self.func(*self.args, **self.kwargs)
        except Exception as e:
            self.exception_info = sys.exc_info()
            self.exception = e


def get_pool(name="default", thread_count=1):
    """Get (and initialize) a Thread pool.

    If thread_count is 0, then None is returned.

    If the thread pool had already been initialized, thread_count will
    be ignored.
    """
    if not thread_count:
        return None

    with _init_lock:
        pool = _pools.get(name, None)
        if pool is None:
            pool = ThreadPool(thread_count)
            _pools[name] = pool
    return pool


def stop_pools():
    with _init_lock:
        for name in list(_pools.keys()):
            pool = _pools.pop(name)
            pool.close()


def stop_pool(name="default"):
    with _init_lock:
        _pools[name].close()
        del _pools[name]


class PoolTimeoutError(Exception):
    pass


def pool_exec(pool, jobs, timeout):
    """Execute a list of jobs, yielding each one as it completes.

    If a pool is specified then the jobs will be executed asynchronously,
    otherwise they are executed in order.

    If not all jobs have been executed after the specified timeout a
    PoolTimeoutError will be raised. When operating synchronously the
    timeout is checked before each job is run.
    """
    start = time.time()
    deadline = start + timeout
    if pool:
        queue = six.moves.queue.Queue()

        def pool_executor(job):
            job.run()
            queue.put(job)

        for job in jobs:
            pool.apply_async(func=pool_executor, args=[job])

        done = 0
        total = len(jobs)

        while done < total:
            wait_time = max(0, deadline - time.time())
            try:
                job = queue.get(True, wait_time)
            except six.moves.queue.Empty:
                raise PoolTimeoutError("Timed out after %fs" % (time.time() - start))

            done += 1
            yield job
    else:
        for job in jobs:
            if time.time() > deadline:
                raise PoolTimeoutError("Timed out after %fs" % (time.time() - start))

            job.run()
            yield job