1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
|
import asyncio
import os
from collections import defaultdict
from collections.abc import MutableSequence
from contextlib import suppress
from molotov.util import cancellable_sleep
class Tasks(MutableSequence):
"""Manages tasks lifecycles across processes."""
def __init__(self):
self._tasks = defaultdict(list)
def _get_tasks(self):
return self._tasks[os.getpid()]
def __len__(self):
return len(self._get_tasks())
def __getitem__(self, i):
return self._get_tasks()[i]
def __delitem__(self, i):
del self._get_tasks()[i]
def __setitem__(self, i, v):
self._get_tasks()[i] = v
def insert(self, i, v):
return self._get_tasks().insert(i, v)
def __str__(self):
return str(self._get_tasks())
def ensure_future(self, coro):
fut = asyncio.ensure_future(coro)
self.append(fut)
return fut
async def gather(self):
return await asyncio.gather(*self._get_tasks())
async def cancel_all(self):
cancellable_sleep.cancel_all()
for task in reversed(self._get_tasks()):
if not task.done():
with suppress(asyncio.CancelledError):
task.cancel()
await task
for task in self._get_tasks():
del task
self.reset_tasks()
def reset_tasks(self):
self._tasks[os.getpid()][:] = []
|