1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
try:
import asyncio
from asyncio.test_utils import run_once as _run_once
def run_once():
return _run_once(asyncio.get_event_loop())
except ImportError as e:
try:
import trollius as asyncio
except ImportError:
asyncio = None
def run_once():
'''
copied from asyncio.testutils because trollius has no
"testutils"
'''
# in Twisted, this method is a no-op
if asyncio is None:
return
# just like modern asyncio.testutils.run_once does it...
loop = asyncio.get_event_loop()
loop.stop()
loop.run_forever()
asyncio.gather(*asyncio.Task.all_tasks())
try:
# XXX fixme hack better way to detect twisted
# (has to work on py3 where asyncio exists always, though)
import twisted # noqa
def await(_):
return
except ImportError:
def await(future):
asyncio.get_event_loop().run_until_complete(future)
|