1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
|
# © 2014 Mark Harviston <mark.harviston@gmail.com>
# © 2014 Arve Knudsen <arve.knudsen@gmail.com>
# BSD License
"""Windows specific Quamash functionality."""
import asyncio
import sys
try:
import _winapi
from asyncio import windows_events
import _overlapped
except ImportError: # noqa
pass # w/o guarding this import py.test can't gather doctests on platforms w/o _winapi
import math
from . import QtCore, _make_signaller
from ._common import with_logger
UINT32_MAX = 0xffffffff
class _ProactorEventLoop(asyncio.ProactorEventLoop):
"""Proactor based event loop."""
def __init__(self):
super().__init__(_IocpProactor())
self.__event_signaller = _make_signaller(QtCore, list)
self.__event_signal = self.__event_signaller.signal
self.__event_signal.connect(self._process_events)
self.__event_poller = _EventPoller(self.__event_signal)
def _process_events(self, events):
"""Process events from proactor."""
for f, callback, transferred, key, ov in events:
try:
self._logger.debug('Invoking event callback {}'.format(callback))
value = callback(transferred, key, ov)
except OSError:
self._logger.warning('Event callback failed', exc_info=sys.exc_info())
else:
f.set_result(value)
def _before_run_forever(self):
self.__event_poller.start(self._proactor)
def _after_run_forever(self):
self.__event_poller.stop()
@with_logger
class _IocpProactor(windows_events.IocpProactor):
def __init__(self):
self.__events = []
super(_IocpProactor, self).__init__()
self._lock = QtCore.QMutex()
def select(self, timeout=None):
"""Override in order to handle events in a threadsafe manner."""
if not self.__events:
self._poll(timeout)
tmp = self.__events
self.__events = []
return tmp
def close(self):
self._logger.debug('Closing')
super(_IocpProactor, self).close()
def recv(self, conn, nbytes, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recv(conn, nbytes, flags)
def send(self, conn, buf, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).send(conn, buf, flags)
def _poll(self, timeout=None):
"""Override in order to handle events in a threadsafe manner."""
if timeout is None:
ms = UINT32_MAX # wait for eternity
elif timeout < 0:
raise ValueError("negative timeout")
else:
# GetQueuedCompletionStatus() has a resolution of 1 millisecond,
# round away from zero to wait *at least* timeout seconds.
ms = math.ceil(timeout * 1e3)
if ms >= UINT32_MAX:
raise ValueError("timeout too big")
with QtCore.QMutexLocker(self._lock):
while True:
# self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format(
# ms, threading.get_ident()))
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None:
break
err, transferred, key, address = status
try:
f, ov, obj, callback = self._cache.pop(address)
except KeyError:
# key is either zero, or it is used to return a pipe
# handle which should be closed to avoid a leak.
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
_winapi.CloseHandle(key)
ms = 0
continue
if obj in self._stopped_serving:
f.cancel()
# Futures might already be resolved or cancelled
elif not f.done():
self.__events.append((f, callback, transferred, key, ov))
ms = 0
def _wait_for_handle(self, handle, timeout, _is_cancel):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self)._wait_for_handle(handle, timeout, _is_cancel)
def accept(self, listener):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).accept(listener)
def connect(self, conn, address):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).connect(conn, address)
@with_logger
class _EventWorker(QtCore.QThread):
def __init__(self, proactor, parent):
super().__init__()
self.__stop = False
self.__proactor = proactor
self.__sig_events = parent.sig_events
self.__semaphore = QtCore.QSemaphore()
def start(self):
super().start()
self.__semaphore.acquire()
def stop(self):
self.__stop = True
# Wait for thread to end
self.wait()
def run(self):
self._logger.debug('Thread started')
self.__semaphore.release()
while not self.__stop:
events = self.__proactor.select(0.01)
if events:
self._logger.debug('Got events from poll: {}'.format(events))
self.__sig_events.emit(events)
self._logger.debug('Exiting thread')
@with_logger
class _EventPoller:
"""Polling of events in separate thread."""
def __init__(self, sig_events):
self.sig_events = sig_events
def start(self, proactor):
self._logger.debug('Starting (proactor: {})...'.format(proactor))
self.__worker = _EventWorker(proactor, self)
self.__worker.start()
def stop(self):
self._logger.debug('Stopping worker thread...')
self.__worker.stop()
|