File: test_future_store.py

package info (click to toggle)
python-aiormq 6.8.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 392 kB
  • sloc: python: 3,214; makefile: 27
file content (106 lines) | stat: -rw-r--r-- 2,398 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import asyncio

import pytest

from aiormq.abc import TaskWrapper
from aiormq.base import FutureStore


@pytest.fixture
def root_store(loop):
    store = FutureStore(loop=loop)
    try:
        yield store
    finally:
        loop.run_until_complete(
            store.reject_all(Exception("Cancelling")),
        )


@pytest.fixture
def child_store(loop, root_store):
    store = root_store.get_child()
    try:
        yield store
    finally:
        loop.run_until_complete(
            store.reject_all(Exception("Cancelling")),
        )


async def test_reject_all(
    loop, root_store: FutureStore, child_store: FutureStore,
):

    future1 = root_store.create_future()
    future2 = child_store.create_future()

    assert root_store.futures
    assert child_store.futures

    await root_store.reject_all(RuntimeError)
    await asyncio.sleep(0.1)

    assert isinstance(future1.exception(), RuntimeError)
    assert isinstance(future2.exception(), RuntimeError)
    assert not root_store.futures
    assert not child_store.futures


async def test_result(
    loop, root_store: FutureStore, child_store: FutureStore,
):
    async def result():
        await asyncio.sleep(0.1)
        return "result"

    assert await child_store.create_task(result()) == "result"


async def test_siblings(
    loop, root_store: FutureStore, child_store: FutureStore,
):
    async def coro(store):
        await asyncio.sleep(0.1)
        await store.reject_all(RuntimeError)

    task1 = child_store.create_task(coro(child_store))
    assert root_store.futures
    assert child_store.futures

    with pytest.raises(RuntimeError):
        await task1

    await asyncio.sleep(0.1)

    assert not root_store.futures
    assert not child_store.futures

    child = child_store.get_child().get_child().get_child()
    task = child.create_task(coro(child))

    assert root_store.futures
    assert child_store.futures
    assert child.futures

    with pytest.raises(RuntimeError):
        await task

    await asyncio.sleep(0.1)

    assert not root_store.futures
    assert not child_store.futures
    assert not child.futures


async def test_task_wrapper(loop):
    future = loop.create_future()
    wrapped = TaskWrapper(future)

    wrapped.throw(RuntimeError())

    with pytest.raises(asyncio.CancelledError):
        await future

    with pytest.raises(RuntimeError):
        await wrapped