1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
|
"""
Asyncio-based hub, originally implemented by Miguel Grinberg.
"""
# The various modules involved in asyncio need to call the original, unpatched
# standard library APIs to work: socket, select, threading, and so on. We
# therefore don't import them on the module level, since that would involve
# their imports getting patched, and instead delay importing them as much as
# possible. Then, we do a little song and dance in Hub.__init__ below so that
# when they're imported they import the original modules (select, socket, etc)
# rather than the patched ones.
import os
import sys
from eventlet.hubs import hub
from eventlet.patcher import _unmonkey_patch_asyncio_all
def is_available():
"""
Indicate whether this hub is available, since some hubs are
platform-specific.
Python always has asyncio, so this is always ``True``.
"""
return True
class Hub(hub.BaseHub):
"""An Eventlet hub implementation on top of an asyncio event loop."""
def __init__(self):
super().__init__()
# Pre-emptively make sure we're using the right modules:
_unmonkey_patch_asyncio_all()
# The presumption is that eventlet is driving the event loop, so we
# want a new one we control.
import asyncio
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.sleep_event = asyncio.Event()
import asyncio.events
if hasattr(asyncio.events, "on_fork"):
# Allow post-fork() child to continue using the same event loop.
# This is a terrible idea.
asyncio.events.on_fork.__code__ = (lambda: None).__code__
else:
# On Python 3.9-3.11, there's a thread local we need to reset.
# Also a terrible idea.
def re_register_loop(loop=self.loop):
asyncio.events._set_running_loop(loop)
os.register_at_fork(after_in_child=re_register_loop)
def add_timer(self, timer):
"""
Register a ``Timer``.
Typically not called directly by users.
"""
super().add_timer(timer)
self.sleep_event.set()
def _file_cb(self, cb, fileno):
"""
Callback called by ``asyncio`` when a file descriptor has an event.
"""
try:
cb(fileno)
except self.SYSTEM_EXCEPTIONS:
raise
except:
self.squelch_exception(fileno, sys.exc_info())
self.sleep_event.set()
def add(self, evtype, fileno, cb, tb, mark_as_closed):
"""
Add a file descriptor of given event type to the ``Hub``. See the
superclass for details.
Typically not called directly by users.
"""
try:
os.fstat(fileno)
except OSError:
raise ValueError("Invalid file descriptor")
already_listening = self.listeners[evtype].get(fileno) is not None
listener = super().add(evtype, fileno, cb, tb, mark_as_closed)
if not already_listening:
if evtype == hub.READ:
self.loop.add_reader(fileno, self._file_cb, cb, fileno)
else:
self.loop.add_writer(fileno, self._file_cb, cb, fileno)
return listener
def remove(self, listener):
"""
Remove a listener from the ``Hub``. See the superclass for details.
Typically not called directly by users.
"""
super().remove(listener)
evtype = listener.evtype
fileno = listener.fileno
if not self.listeners[evtype].get(fileno):
if evtype == hub.READ:
self.loop.remove_reader(fileno)
else:
self.loop.remove_writer(fileno)
def remove_descriptor(self, fileno):
"""
Remove a file descriptor from the ``asyncio`` loop.
Typically not called directly by users.
"""
have_read = self.listeners[hub.READ].get(fileno)
have_write = self.listeners[hub.WRITE].get(fileno)
super().remove_descriptor(fileno)
if have_read:
self.loop.remove_reader(fileno)
if have_write:
self.loop.remove_writer(fileno)
def run(self, *a, **kw):
"""
Start the ``Hub`` running. See the superclass for details.
"""
import asyncio
async def async_run():
if self.running:
raise RuntimeError("Already running!")
try:
self.running = True
self.stopping = False
while not self.stopping:
while self.closed:
# We ditch all of these first.
self.close_one()
self.prepare_timers()
if self.debug_blocking:
self.block_detect_pre()
self.fire_timers(self.clock())
if self.debug_blocking:
self.block_detect_post()
self.prepare_timers()
wakeup_when = self.sleep_until()
if wakeup_when is None:
sleep_time = self.default_sleep()
else:
sleep_time = wakeup_when - self.clock()
if sleep_time > 0:
try:
await asyncio.wait_for(self.sleep_event.wait(), sleep_time)
except asyncio.TimeoutError:
pass
self.sleep_event.clear()
else:
await asyncio.sleep(0)
else:
self.timers_canceled = 0
del self.timers[:]
del self.next_timers[:]
finally:
self.running = False
self.stopping = False
self.loop.run_until_complete(async_run())
|