File: asyncio_wait_for_linked_task.py

package info (click to toggle)
micropython 1.26.1%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 50,196 kB
  • sloc: ansic: 324,551; python: 63,215; xml: 4,241; makefile: 3,618; sh: 1,586; javascript: 754; asm: 723; cpp: 83; exp: 11; pascal: 6
file content (66 lines) | stat: -rw-r--r-- 1,844 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Test asyncio.wait_for, with dependent tasks
# https://github.com/micropython/micropython/issues/16759

try:
    import asyncio
except ImportError:
    print("SKIP")
    raise SystemExit


# CPython 3.12 deprecated calling get_event_loop() when there is no current event
# loop, so to make this test run on CPython requires setting the event loop.
if hasattr(asyncio, "set_event_loop"):
    asyncio.set_event_loop(asyncio.new_event_loop())


class Worker:
    def __init__(self):
        self._eventLoop = None
        self._tasks = []

    def launchTask(self, asyncJob):
        if self._eventLoop is None:
            self._eventLoop = asyncio.get_event_loop()
        return self._eventLoop.create_task(asyncJob)

    async def job(self, prerequisite, taskName):
        if prerequisite:
            await prerequisite
        await asyncio.sleep(0.1)
        print(taskName, "work completed")

    def planTasks(self):
        self._tasks.append(self.launchTask(self.job(None, "task0")))
        self._tasks.append(self.launchTask(self.job(self._tasks[0], "task1")))
        self._tasks.append(self.launchTask(self.job(self._tasks[1], "task2")))

    async def waitForTask(self, taskIdx):
        return await self._tasks[taskIdx]

    def syncWaitForTask(self, taskIdx):
        return self._eventLoop.run_until_complete(self._tasks[taskIdx])


async def async_test():
    print("--- async test")
    worker = Worker()
    worker.planTasks()
    await worker.waitForTask(0)
    print("-> task0 done")
    await worker.waitForTask(2)
    print("-> task2 done")


def sync_test():
    print("--- sync test")
    worker = Worker()
    worker.planTasks()
    worker.syncWaitForTask(0)
    print("-> task0 done")
    worker.syncWaitForTask(2)
    print("-> task2 done")


asyncio.get_event_loop().run_until_complete(async_test())
sync_test()