1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import atexit
import queue
import threading
import weakref
class Threading:
@staticmethod
def event_object(*args, **kwargs):
return threading.Event(*args, **kwargs)
@staticmethod
def lock_object(*args, **kwargs):
return threading.Lock(*args, **kwargs)
@staticmethod
def rlock_object(*args, **kwargs):
return threading.RLock(*args, **kwargs)
@staticmethod
def condition_object(*args, **kwargs):
return threading.Condition(*args, **kwargs)
_to_be_cleaned = weakref.WeakKeyDictionary()
_dying = False
class _Stopping(Exception):
pass
class ThreadWorker(threading.Thread):
MAX_IDLE_FOR = 1
def __init__(self, executor, work_queue):
super().__init__()
self.work_queue = work_queue
self.should_stop = False
self.idle = False
self.daemon = True
# Ensure that when the owning executor gets cleaned up that these
# threads also get shutdown (if they were not already shutdown).
self.executor_ref = weakref.ref(executor, lambda _obj: self.stop())
@classmethod
def create_and_register(cls, executor, work_queue):
w = cls(executor, work_queue)
# Ensure that on shutdown, if threads still exist that we get
# around to cleaning them up and waiting for them to correctly stop.
#
# TODO(harlowja): use a weakrefset in the future, as we don't
# really care about the values...
_to_be_cleaned[w] = True
return w
def _is_dying(self):
if self.should_stop or _dying:
return True
executor = self.executor_ref()
if executor is None:
return True
# Avoid confusing the GC with cycles (since each executor
# references its known workers)...
del executor
return False
def _wait_for_work(self):
self.idle = True
work = None
while work is None:
try:
work = self.work_queue.get(True, self.MAX_IDLE_FOR)
except queue.Empty:
if self._is_dying():
raise _Stopping()
self.idle = False
return work
def stop(self):
self.should_stop = True
def run(self):
while not self._is_dying():
try:
work = self._wait_for_work()
except _Stopping:
return
try:
work.run()
finally:
# Mark task as done for queue.join() support
self.work_queue.task_done()
# Avoid any potential (self) references to the work item
# in tracebacks or similar...
del work
def _clean_up():
"""Ensure all threads that were created were destroyed cleanly."""
global _dying
_dying = True
threads_to_wait_for = []
while _to_be_cleaned:
worker, _work_val = _to_be_cleaned.popitem()
worker.stop()
threads_to_wait_for.append(worker)
while threads_to_wait_for:
worker = threads_to_wait_for.pop()
try:
worker.join()
finally:
del worker
atexit.register(_clean_up)
|