File: aiothreads.py

package info (click to toggle)
python-pytray 0.3.5-4
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 192 kB
  • sloc: python: 510; sh: 30; makefile: 3
file content (306 lines) | stat: -rw-r--r-- 10,234 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
# -*- coding: utf-8 -*-
"""
A module to create interoperability between concurrent threads and asyncio.

An asyncio event loop can be running on a thread on which coroutines can be scheduled
from a different threads.  The result is returned as a concurrent future which can be
waited on.
"""
import asyncio
import concurrent.futures
from concurrent.futures import Future as ThreadFuture
from contextlib import contextmanager
from functools import partial
import logging
import sys
import threading
import typing
from typing import Callable

from . import futures

__all__ = ("LoopScheduler",)

_LOGGER = logging.getLogger(__name__)


def aio_future_chain_thread(aio_future: asyncio.Future, future: ThreadFuture):
    """Chain an asyncio future to a thread future.
    If the result of the asyncio future is another aio future this will also
    be chained so the client only sees thread futures
    """

    def done(done_future: asyncio.Future):
        # Here we're on the aio thread
        # Copy over the future
        try:
            result = done_future.result()
            if asyncio.isfuture(result):
                # Change the aio future to a thread future
                fut = ThreadFuture()
                aio_future_chain_thread(result, fut)
                result = fut

            future.set_result(result)
        except asyncio.CancelledError:
            future.cancel()
        except Exception as exception:  # pylint: disable=broad-except
            future.set_exception(exception)

    aio_future.add_done_callback(done)
    return future


def thread_future_chain_aio(future: ThreadFuture, aio_future: asyncio.Future):
    """Chain a thread future to an asyncio future
    If the result of the thread future is another thread future this will also be
    chained so the client only sees aio futures"""
    loop = aio_future._loop  # pylint: disable=protected-access

    def done(done_future: ThreadFuture):
        try:
            result = done_future.result()
            if isinstance(result, ThreadFuture):
                # Change the thread future to an aio future
                fut = loop.create_future()
                thread_future_chain_aio(result, fut)
                result = fut

            loop.call_soon_threadsafe(aio_future.set_result, result)
        except concurrent.futures.CancelledError:
            loop.call_soon_threadsafe(aio_future.cancel)
        except Exception as exception:  # pylint: disable=broad-except
            loop.call_soon_threadsafe(aio_future.set_exception, exception)

    future.add_done_callback(done)
    return aio_future


def aio_future_to_thread(aio_future: asyncio.Future):
    """Convert an asyncio future to a thread future.  Mutations of the thread future will be
    propagated to the asyncio future but not the other way around."""
    future = ThreadFuture()
    thread_future_chain_aio(future, aio_future)
    return future


class LoopScheduler:
    DEFAULT_TASK_TIMEOUT = 5.0

    def __init__(
        self,
        loop: asyncio.AbstractEventLoop = None,
        name="AsyncioScheduler",
        timeout=DEFAULT_TASK_TIMEOUT,
    ):
        self._loop = loop or asyncio.new_event_loop()
        self._name = name
        self.task_timeout = timeout
        self._asyncio_thread = None
        self._stop_signal = None
        self._closed = False

    def __enter__(self):
        self._ensure_running()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.stop()

    def loop(self):
        return self._loop

    def is_closed(self) -> bool:
        return self._closed

    def is_running(self):
        return self._asyncio_thread is not None

    def close(self):
        if self.is_closed():
            return
        self.stop()
        self._closed = True

    def start(self):
        if self._asyncio_thread is not None:
            raise RuntimeError("Already running")

        start_future = ThreadFuture()

        self._asyncio_thread = threading.Thread(
            target=self._run_loop, name=self._name, args=(start_future,), daemon=True
        )
        self._asyncio_thread.start()
        start_future.result()

    def stop(self):
        # Save the thread because it will be set to None when it does stop
        aio_thread = self._asyncio_thread
        if aio_thread is None:
            return

        stop_future = ThreadFuture()
        # Send the stop signal
        self._loop.call_soon_threadsafe(
            partial(self._stop_signal.set_result, stop_future)
        )
        # Wait for the result in case there was an exception
        stop_future.result()
        aio_thread.join()

    def await_(self, awaitable: typing.Awaitable, *, name: str = None):
        """
        Await an awaitable on the event loop and return the result.  It may take a little time for
        the loop to get around to scheduling it, so we use a timeout as set by the TASK_TIMEOUT class
        constant.

        :param awaitable: the coroutine to run
        :param name: an optional name for the awaitable to aid with debugging.  If no name is
            supplied will attempt to use `awaitable.__name__`.
        :return: the result of running the coroutine
        """
        try:
            return self.await_submit(awaitable).result(timeout=self.task_timeout)
        except concurrent.futures.TimeoutError as exc:
            # Try to get a reasonable name for the awaitable
            name = name or getattr(awaitable, "__name__", "Awaitable")
            raise concurrent.futures.TimeoutError(
                "{} after {} seconds".format(name, self.task_timeout)
            ) from exc

    def await_submit(self, awaitable: typing.Awaitable) -> ThreadFuture:
        """
        Schedule an awaitable on the loop and return the corresponding future
        """

        async def coro():
            res = await awaitable
            if asyncio.isfuture(res):
                future = ThreadFuture()
                aio_future_chain_thread(res, future)
                return future

            return res

        self._ensure_running()
        return asyncio.run_coroutine_threadsafe(coro(), loop=self._loop)

    def run(self, func, *args, **kwargs):
        """
        Run a function on the event loop and return the result.  It may take a little time for the
        loop to get around to scheduling it so we use a timeout as set by the TASK_TIMEOUT class
        constant.

        :param func: the coroutine to run
        :return: the result of running the coroutine
        """
        return self.submit(func, *args, **kwargs).result(timeout=self.task_timeout)

    def submit(self, func: Callable, *args, **kwargs) -> ThreadFuture:
        """
        Schedule a function on the loop and return the corresponding future
        """
        self._ensure_running()

        future = ThreadFuture()

        def callback():
            if not future.cancelled():
                with futures.capture_exceptions(future):
                    result = func(*args, **kwargs)
                    if asyncio.isfuture(result):
                        result = aio_future_to_thread(result)

                    future.set_result(result)

        handle = self._loop.call_soon_threadsafe(callback)

        def handle_cancel(done_future: ThreadFuture):
            """Function to propagate a cancellation of the concurrent future up to the loop
            callback"""
            if done_future.cancelled():
                self._loop.call_soon_threadsafe(handle.cancel)

        future.add_done_callback(handle_cancel)

        return future

    @contextmanager
    def async_ctx(self, ctx_manager: typing.AsyncContextManager):
        """Can be used to turn an async context manager into a synchronous one"""
        aexit = ctx_manager.__aexit__
        aenter = ctx_manager.__aenter__

        # result = self.await_(aenter())
        result = asyncio.run_coroutine_threadsafe(aenter(), loop=self._loop).result()
        # Make sure that if we got a future, we convert it appropriately
        if asyncio.isfuture(result):
            result = aio_future_to_thread(result)
        try:
            yield result
        except Exception:  # pylint: disable=broad-except
            if not self.await_(aexit(*sys.exc_info())):
                raise
        else:
            self.await_(aexit(None, None, None))

    @contextmanager
    def ctx(self, ctx_manager: typing.ContextManager):
        """Can be used to enter a context on the event loop"""
        ctx_exit = ctx_manager.__exit__
        ctx_enter = ctx_manager.__enter__

        result = self.run(ctx_enter)
        try:
            yield result
        except Exception:  # pylint: disable=broad-except
            if not self.run(ctx_exit, *sys.exc_info()):
                raise
        else:
            self.run(ctx_exit, None, None, None)

    def async_iter(self, aiterable: typing.AsyncIterable):
        """Iterate an async iterable from this thread"""
        iterator = aiterable.__aiter__()
        running = True
        while running:
            try:
                target = self.await_(iterator.__anext__())
            except StopAsyncIteration:
                running = False
            else:
                yield target

    def _ensure_running(self):
        if self._asyncio_thread is not None:
            return
        self.start()

    def _run_loop(self, start_future):
        """Here we are on the aio thread"""
        _LOGGER.debug(
            "Starting event loop (id %s) on %s",
            id(self._loop),
            threading.current_thread(),
        )

        asyncio.set_event_loop(self._loop)
        try:
            self._stop_signal = self._loop.create_future()

            async def run_loop():
                start_future.set_result(True)

                # Wait to stop
                stop_future = await self._stop_signal
                stop_future.set_result(True)

            self._loop.run_until_complete(run_loop())

            # The loop is finished
            self._asyncio_thread = None

            _LOGGER.debug("Event loop stopped on %s", threading.current_thread())
        finally:
            asyncio.set_event_loop(None)