1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
|
import asyncio
import pytest
from aiormq.abc import TaskWrapper
from aiormq.base import FutureStore
@pytest.fixture
def root_store(loop):
store = FutureStore(loop=loop)
try:
yield store
finally:
loop.run_until_complete(
store.reject_all(Exception("Cancelling")),
)
@pytest.fixture
def child_store(loop, root_store):
store = root_store.get_child()
try:
yield store
finally:
loop.run_until_complete(
store.reject_all(Exception("Cancelling")),
)
async def test_reject_all(
loop, root_store: FutureStore, child_store: FutureStore,
):
future1 = root_store.create_future()
future2 = child_store.create_future()
assert root_store.futures
assert child_store.futures
await root_store.reject_all(RuntimeError)
await asyncio.sleep(0.1)
assert isinstance(future1.exception(), RuntimeError)
assert isinstance(future2.exception(), RuntimeError)
assert not root_store.futures
assert not child_store.futures
async def test_result(
loop, root_store: FutureStore, child_store: FutureStore,
):
async def result():
await asyncio.sleep(0.1)
return "result"
assert await child_store.create_task(result()) == "result"
async def test_siblings(
loop, root_store: FutureStore, child_store: FutureStore,
):
async def coro(store):
await asyncio.sleep(0.1)
await store.reject_all(RuntimeError)
task1 = child_store.create_task(coro(child_store))
assert root_store.futures
assert child_store.futures
with pytest.raises(RuntimeError):
await task1
await asyncio.sleep(0.1)
assert not root_store.futures
assert not child_store.futures
child = child_store.get_child().get_child().get_child()
task = child.create_task(coro(child))
assert root_store.futures
assert child_store.futures
assert child.futures
with pytest.raises(RuntimeError):
await task
await asyncio.sleep(0.1)
assert not root_store.futures
assert not child_store.futures
assert not child.futures
async def test_task_wrapper(loop):
future = loop.create_future()
wrapped = TaskWrapper(future)
wrapped.throw(RuntimeError())
with pytest.raises(asyncio.CancelledError):
await future
with pytest.raises(RuntimeError):
await wrapped
|