
|
# © 2014 Mark Harviston <mark.harviston@gmail.com>
# © 2014 Arve Knudsen <arve.knudsen@gmail.com>
# BSD License
"""Windows specific Quamash functionality."""
import asyncio
import sys
try:
import _winapi
from asyncio import windows_events
import _overlapped
except ImportError: # noqa
pass # w/o guarding this import py.test can't gather doctests on platforms w/o _winapi
import math
from . import QtCore, _make_signaller
from ._common import with_logger
UINT32_MAX = 0xffffffff
class _ProactorEventLoop(asyncio.ProactorEventLoop):
"""Proactor based event loop."""
def __init__(self):
super().__init__(_IocpProactor())
self.__event_signaller = _make_signaller(QtCore, list)
self.__event_signal = self.__event_signaller.signal
self.__event_signal.connect(self._process_events)
self.__event_poller = _EventPoller(self.__event_signal)
def _process_events(self, events):
"""Process events from proactor."""
for f, callback, transferred, key, ov in events:
try:
self._logger.debug('Invoking event callback {}'.format(callback))
value = callback(transferred, key, ov)
except OSError:
self._logger.warning('Event callback failed', exc_info=sys.exc_info())
else:
f.set_result(value)
def _before_run_forever(self):
self.__event_poller.start(self._proactor)
def _after_run_forever(self):
self.__event_poller.stop()
@with_logger
class _IocpProactor(windows_events.IocpProactor):
def __init__(self):
self.__events = []
super(_IocpProactor, self).__init__()
self._lock = QtCore.QMutex()
def select(self, timeout=None):
"""Override in order to handle events in a threadsafe manner."""
if not self.__events:
self._poll(timeout)
tmp = self.__events
self.__events = []
return tmp
def close(self):
self._logger.debug('Closing')
super(_IocpProactor, self).close()
def recv(self, conn, nbytes, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).recv(conn, nbytes, flags)
def send(self, conn, buf, flags=0):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).send(conn, buf, flags)
def _poll(self, timeout=None):
"""Override in order to handle events in a threadsafe manner."""
if timeout is None:
ms = UINT32_MAX # wait for eternity
elif timeout < 0:
raise ValueError("negative timeout")
else:
# GetQueuedCompletionStatus() has a resolution of 1 millisecond,
# round away from zero to wait *at least* timeout seconds.
ms = math.ceil(timeout * 1e3)
if ms >= UINT32_MAX:
raise ValueError("timeout too big")
with QtCore.QMutexLocker(self._lock):
while True:
# self._logger.debug('Polling IOCP with timeout {} ms in thread {}...'.format(
# ms, threading.get_ident()))
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None:
break
err, transferred, key, address = status
try:
f, ov, obj, callback = self._cache.pop(address)
except KeyError:
# key is either zero, or it is used to return a pipe
# handle which should be closed to avoid a leak.
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
_winapi.CloseHandle(key)
ms = 0
continue
if obj in self._stopped_serving:
f.cancel()
# Futures might already be resolved or cancelled
elif not f.done():
self.__events.append((f, callback, transferred, key, ov))
ms = 0
def _wait_for_handle(self, handle, timeout, _is_cancel):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self)._wait_for_handle(handle, timeout, _is_cancel)
def accept(self, listener):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).accept(listener)
def connect(self, conn, address):
with QtCore.QMutexLocker(self._lock):
return super(_IocpProactor, self).connect(conn, address)
@with_logger
class _EventWorker(QtCore.QThread):
def __init__(self, proactor, parent):
super().__init__()
self.__stop = False
self.__proactor = proactor
self.__sig_events = parent.sig_events
self.__semaphore = QtCore.QSemaphore()
def start(self):
super().start()
self.__semaphore.acquire()
def stop(self):
self.__stop = True
# Wait for thread to end
self.wait()
def run(self):
self._logger.debug('Thread started')
self.__semaphore.release()
while not self.__stop:
events = self.__proactor.select(0.01)
if events:
self._logger.debug('Got events from poll: {}'.format(events))
self.__sig_events.emit(events)
self._logger.debug('Exiting thread')
@with_logger
class _EventPoller:
"""Polling of events in separate thread."""
def __init__(self, sig_events):
self.sig_events = sig_events
def start(self, proactor):
self._logger.debug('Starting (proactor: {})...'.format(proactor))
self.__worker = _EventWorker(proactor, self)
self.__worker.start()
def stop(self):
self._logger.debug('Stopping worker thread...')
self.__worker.stop()
|