1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
"""Bridges between the `asyncio` module and Tornado IOLoop.
This is a work in progress and interfaces are subject to change.
To test:
python3.4 -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOLoop
python3.4 -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOMainLoop
(the tests log a few warnings with AsyncIOMainLoop because they leave some
unfinished callbacks on the event loop that fail when it resumes)
"""
from __future__ import absolute_import, division, print_function, with_statement
import datetime
import functools
import os
from tornado.ioloop import IOLoop
from tornado import stack_context
try:
# Import the real asyncio module for py33+ first. Older versions of the
# trollius backport also use this name.
import asyncio
except ImportError as e:
# Asyncio itself isn't available; see if trollius is (backport to py26+).
try:
import trollius as asyncio
except ImportError:
# Re-raise the original asyncio error, not the trollius one.
raise e
class BaseAsyncIOLoop(IOLoop):
def initialize(self, asyncio_loop, close_loop=False):
self.asyncio_loop = asyncio_loop
self.close_loop = close_loop
self.asyncio_loop.call_soon(self.make_current)
# Maps fd to handler function (as in IOLoop.add_handler)
self.handlers = {}
# Set of fds listening for reads/writes
self.readers = set()
self.writers = set()
self.closing = False
def close(self, all_fds=False):
self.closing = True
for fd in list(self.handlers):
self.remove_handler(fd)
if all_fds:
try:
os.close(fd)
except OSError:
pass
if self.close_loop:
self.asyncio_loop.close()
def add_handler(self, fd, handler, events):
if fd in self.handlers:
raise ValueError("fd %d added twice" % fd)
self.handlers[fd] = stack_context.wrap(handler)
if events & IOLoop.READ:
self.asyncio_loop.add_reader(
fd, self._handle_events, fd, IOLoop.READ)
self.readers.add(fd)
if events & IOLoop.WRITE:
self.asyncio_loop.add_writer(
fd, self._handle_events, fd, IOLoop.WRITE)
self.writers.add(fd)
def update_handler(self, fd, events):
if events & IOLoop.READ:
if fd not in self.readers:
self.asyncio_loop.add_reader(
fd, self._handle_events, fd, IOLoop.READ)
self.readers.add(fd)
else:
if fd in self.readers:
self.asyncio_loop.remove_reader(fd)
self.readers.remove(fd)
if events & IOLoop.WRITE:
if fd not in self.writers:
self.asyncio_loop.add_writer(
fd, self._handle_events, fd, IOLoop.WRITE)
self.writers.add(fd)
else:
if fd in self.writers:
self.asyncio_loop.remove_writer(fd)
self.writers.remove(fd)
def remove_handler(self, fd):
if fd not in self.handlers:
return
if fd in self.readers:
self.asyncio_loop.remove_reader(fd)
self.readers.remove(fd)
if fd in self.writers:
self.asyncio_loop.remove_writer(fd)
self.writers.remove(fd)
del self.handlers[fd]
def _handle_events(self, fd, events):
self.handlers[fd](fd, events)
def start(self):
self._setup_logging()
self.asyncio_loop.run_forever()
def stop(self):
self.asyncio_loop.stop()
def _run_callback(self, callback, *args, **kwargs):
try:
callback(*args, **kwargs)
except Exception:
self.handle_callback_exception(callback)
def add_timeout(self, deadline, callback):
if isinstance(deadline, (int, float)):
delay = max(deadline - self.time(), 0)
elif isinstance(deadline, datetime.timedelta):
delay = deadline.total_seconds()
else:
raise TypeError("Unsupported deadline %r", deadline)
return self.asyncio_loop.call_later(delay, self._run_callback,
stack_context.wrap(callback))
def remove_timeout(self, timeout):
timeout.cancel()
def add_callback(self, callback, *args, **kwargs):
if self.closing:
raise RuntimeError("IOLoop is closing")
if kwargs:
self.asyncio_loop.call_soon_threadsafe(functools.partial(
self._run_callback, stack_context.wrap(callback),
*args, **kwargs))
else:
self.asyncio_loop.call_soon_threadsafe(
self._run_callback, stack_context.wrap(callback), *args)
add_callback_from_signal = add_callback
class AsyncIOMainLoop(BaseAsyncIOLoop):
def initialize(self):
super(AsyncIOMainLoop, self).initialize(asyncio.get_event_loop(),
close_loop=False)
class AsyncIOLoop(BaseAsyncIOLoop):
def initialize(self):
super(AsyncIOLoop, self).initialize(asyncio.new_event_loop(),
close_loop=True)
|