File: asyncio.py

package info (click to toggle)
python-eventlet 0.40.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,200 kB
  • sloc: python: 25,101; sh: 78; makefile: 31
file content (174 lines) | stat: -rw-r--r-- 5,961 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""
Asyncio-based hub, originally implemented by Miguel Grinberg.
"""

# The various modules involved in asyncio need to call the original, unpatched
# standard library APIs to work: socket, select, threading, and so on. We
# therefore don't import them on the module level, since that would involve
# their imports getting patched, and instead delay importing them as much as
# possible. Then, we do a little song and dance in Hub.__init__ below so that
# when they're imported they import the original modules (select, socket, etc)
# rather than the patched ones.

import os
import sys

from eventlet.hubs import hub
from eventlet.patcher import _unmonkey_patch_asyncio_all


def is_available():
    """
    Indicate whether this hub is available, since some hubs are
    platform-specific.

    Python always has asyncio, so this is always ``True``.
    """
    return True


class Hub(hub.BaseHub):
    """An Eventlet hub implementation on top of an asyncio event loop."""

    def __init__(self):
        super().__init__()

        # Pre-emptively make sure we're using the right modules:
        _unmonkey_patch_asyncio_all()

        # The presumption is that eventlet is driving the event loop, so we
        # want a new one we control.
        import asyncio

        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.sleep_event = asyncio.Event()

        import asyncio.events
        if hasattr(asyncio.events, "on_fork"):
            # Allow post-fork() child to continue using the same event loop.
            # This is a terrible idea.
            asyncio.events.on_fork.__code__ = (lambda: None).__code__
        else:
            # On Python 3.9-3.11, there's a thread local we need to reset.
            # Also a terrible idea.
            def re_register_loop(loop=self.loop):
                asyncio.events._set_running_loop(loop)

            os.register_at_fork(after_in_child=re_register_loop)

    def add_timer(self, timer):
        """
        Register a ``Timer``.

        Typically not called directly by users.
        """
        super().add_timer(timer)
        self.sleep_event.set()

    def _file_cb(self, cb, fileno):
        """
        Callback called by ``asyncio`` when a file descriptor has an event.
        """
        try:
            cb(fileno)
        except self.SYSTEM_EXCEPTIONS:
            raise
        except:
            self.squelch_exception(fileno, sys.exc_info())
        self.sleep_event.set()

    def add(self, evtype, fileno, cb, tb, mark_as_closed):
        """
        Add a file descriptor of given event type to the ``Hub``.  See the
        superclass for details.

        Typically not called directly by users.
        """
        try:
            os.fstat(fileno)
        except OSError:
            raise ValueError("Invalid file descriptor")
        already_listening = self.listeners[evtype].get(fileno) is not None
        listener = super().add(evtype, fileno, cb, tb, mark_as_closed)
        if not already_listening:
            if evtype == hub.READ:
                self.loop.add_reader(fileno, self._file_cb, cb, fileno)
            else:
                self.loop.add_writer(fileno, self._file_cb, cb, fileno)
        return listener

    def remove(self, listener):
        """
        Remove a listener from the ``Hub``.  See the superclass for details.

        Typically not called directly by users.
        """
        super().remove(listener)
        evtype = listener.evtype
        fileno = listener.fileno
        if not self.listeners[evtype].get(fileno):
            if evtype == hub.READ:
                self.loop.remove_reader(fileno)
            else:
                self.loop.remove_writer(fileno)

    def remove_descriptor(self, fileno):
        """
        Remove a file descriptor from the ``asyncio`` loop.

        Typically not called directly by users.
        """
        have_read = self.listeners[hub.READ].get(fileno)
        have_write = self.listeners[hub.WRITE].get(fileno)
        super().remove_descriptor(fileno)
        if have_read:
            self.loop.remove_reader(fileno)
        if have_write:
            self.loop.remove_writer(fileno)

    def run(self, *a, **kw):
        """
        Start the ``Hub`` running. See the superclass for details.
        """
        import asyncio

        async def async_run():
            if self.running:
                raise RuntimeError("Already running!")
            try:
                self.running = True
                self.stopping = False
                while not self.stopping:
                    while self.closed:
                        # We ditch all of these first.
                        self.close_one()
                    self.prepare_timers()
                    if self.debug_blocking:
                        self.block_detect_pre()
                    self.fire_timers(self.clock())
                    if self.debug_blocking:
                        self.block_detect_post()
                    self.prepare_timers()
                    wakeup_when = self.sleep_until()
                    if wakeup_when is None:
                        sleep_time = self.default_sleep()
                    else:
                        sleep_time = wakeup_when - self.clock()
                    if sleep_time > 0:
                        try:
                            await asyncio.wait_for(self.sleep_event.wait(), sleep_time)
                        except asyncio.TimeoutError:
                            pass
                        self.sleep_event.clear()
                    else:
                        await asyncio.sleep(0)
                else:
                    self.timers_canceled = 0
                    del self.timers[:]
                    del self.next_timers[:]
            finally:
                self.running = False
                self.stopping = False

        self.loop.run_until_complete(async_run())