1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
import asyncio
import warnings
def _await(coro):
"""Create a new event loop, run the coroutine, then close the event loop."""
loop = asyncio.new_event_loop()
with warnings.catch_warnings(record=True) as warns:
ret = loop.run_until_complete(coro)
loop.close()
# Filter out Python 3.13+ RuntimeWarnings about unawaited coroutines
# from async generators that are broken out of (e.g. _filesync_read_until)
warns = [w for w in warns if not (issubclass(w.category, RuntimeWarning) and "was never awaited" in str(w.message))]
if warns:
raise RuntimeError
return ret
def awaiter(func):
def sync_func(*args, **kwargs):
return _await(func(*args, **kwargs))
return sync_func
|