File: _thread.py

package info (click to toggle)
python-futurist 3.2.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 464 kB
  • sloc: python: 2,134; makefile: 21; sh: 4
file content (130 lines) | stat: -rw-r--r-- 3,788 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import atexit
import queue
import threading
import weakref


class Threading:

    @staticmethod
    def event_object(*args, **kwargs):
        return threading.Event(*args, **kwargs)

    @staticmethod
    def lock_object(*args, **kwargs):
        return threading.Lock(*args, **kwargs)

    @staticmethod
    def rlock_object(*args, **kwargs):
        return threading.RLock(*args, **kwargs)

    @staticmethod
    def condition_object(*args, **kwargs):
        return threading.Condition(*args, **kwargs)


_to_be_cleaned = weakref.WeakKeyDictionary()
_dying = False


class _Stopping(Exception):
    pass


class ThreadWorker(threading.Thread):
    MAX_IDLE_FOR = 1

    def __init__(self, executor, work_queue):
        super().__init__()
        self.work_queue = work_queue
        self.should_stop = False
        self.idle = False
        self.daemon = True
        # Ensure that when the owning executor gets cleaned up that these
        # threads also get shutdown (if they were not already shutdown).
        self.executor_ref = weakref.ref(executor, lambda _obj: self.stop())

    @classmethod
    def create_and_register(cls, executor, work_queue):
        w = cls(executor, work_queue)
        # Ensure that on shutdown, if threads still exist that we get
        # around to cleaning them up and waiting for them to correctly stop.
        #
        # TODO(harlowja): use a weakrefset in the future, as we don't
        # really care about the values...
        _to_be_cleaned[w] = True
        return w

    def _is_dying(self):
        if self.should_stop or _dying:
            return True
        executor = self.executor_ref()
        if executor is None:
            return True
        # Avoid confusing the GC with cycles (since each executor
        # references its known workers)...
        del executor
        return False

    def _wait_for_work(self):
        self.idle = True
        work = None
        while work is None:
            try:
                work = self.work_queue.get(True, self.MAX_IDLE_FOR)
            except queue.Empty:
                if self._is_dying():
                    raise _Stopping()
        self.idle = False
        return work

    def stop(self):
        self.should_stop = True

    def run(self):
        while not self._is_dying():
            try:
                work = self._wait_for_work()
            except _Stopping:
                return

            try:
                work.run()
            finally:
                # Mark task as done for queue.join() support
                self.work_queue.task_done()
                # Avoid any potential (self) references to the work item
                # in tracebacks or similar...
                del work


def _clean_up():
    """Ensure all threads that were created were destroyed cleanly."""
    global _dying
    _dying = True
    threads_to_wait_for = []
    while _to_be_cleaned:
        worker, _work_val = _to_be_cleaned.popitem()
        worker.stop()
        threads_to_wait_for.append(worker)
    while threads_to_wait_for:
        worker = threads_to_wait_for.pop()
        try:
            worker.join()
        finally:
            del worker


atexit.register(_clean_up)