File: async_wrapper.py

package info (click to toggle)
python-adb-shell 0.4.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 792 kB
  • sloc: python: 3,861; makefile: 191; sh: 124
file content (28 lines) | stat: -rw-r--r-- 771 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import warnings



def _await(coro):
    """Create a new event loop, run the coroutine, then close the event loop."""
    loop = asyncio.new_event_loop()

    with warnings.catch_warnings(record=True) as warns:
        ret = loop.run_until_complete(coro)
        loop.close()

        # Filter out Python 3.13+ RuntimeWarnings about unawaited coroutines
        # from async generators that are broken out of (e.g. _filesync_read_until)
        warns = [w for w in warns if not (issubclass(w.category, RuntimeWarning) and "was never awaited" in str(w.message))]

        if warns:
            raise RuntimeError

        return ret


def awaiter(func):
    def sync_func(*args, **kwargs):
        return _await(func(*args, **kwargs))

    return sync_func