1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
|
# Test asyncio.wait_for, with dependent tasks
# https://github.com/micropython/micropython/issues/16759
try:
import asyncio
except ImportError:
print("SKIP")
raise SystemExit
# CPython 3.12 deprecated calling get_event_loop() when there is no current event
# loop, so to make this test run on CPython requires setting the event loop.
if hasattr(asyncio, "set_event_loop"):
asyncio.set_event_loop(asyncio.new_event_loop())
class Worker:
def __init__(self):
self._eventLoop = None
self._tasks = []
def launchTask(self, asyncJob):
if self._eventLoop is None:
self._eventLoop = asyncio.get_event_loop()
return self._eventLoop.create_task(asyncJob)
async def job(self, prerequisite, taskName):
if prerequisite:
await prerequisite
await asyncio.sleep(0.1)
print(taskName, "work completed")
def planTasks(self):
self._tasks.append(self.launchTask(self.job(None, "task0")))
self._tasks.append(self.launchTask(self.job(self._tasks[0], "task1")))
self._tasks.append(self.launchTask(self.job(self._tasks[1], "task2")))
async def waitForTask(self, taskIdx):
return await self._tasks[taskIdx]
def syncWaitForTask(self, taskIdx):
return self._eventLoop.run_until_complete(self._tasks[taskIdx])
async def async_test():
print("--- async test")
worker = Worker()
worker.planTasks()
await worker.waitForTask(0)
print("-> task0 done")
await worker.waitForTask(2)
print("-> task2 done")
def sync_test():
print("--- sync test")
worker = Worker()
worker.planTasks()
worker.syncWaitForTask(0)
print("-> task0 done")
worker.syncWaitForTask(2)
print("-> task2 done")
asyncio.get_event_loop().run_until_complete(async_test())
sync_test()
|