File: test_aiothreads.py

package info (click to toggle)
python-pytray 0.3.5-4
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 192 kB
  • sloc: python: 510; sh: 30; makefile: 3
file content (139 lines) | stat: -rw-r--r-- 3,942 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# -*- coding: utf-8 -*-
import asyncio
import concurrent.futures
import contextlib

import pytest

from pytray.aiothreads import LoopScheduler


@pytest.fixture
def loop_scheduler():
    loop = asyncio.new_event_loop()
    loop.set_debug(True)
    with LoopScheduler(loop=loop) as scheduler:
        yield scheduler


async def simple(arg):
    await asyncio.sleep(0.1)
    return arg


def test_simple_await_submit(loop_scheduler):  # pylint: disable=redefined-outer-name
    future = loop_scheduler.await_submit(simple("Done!"))
    assert future.result() == "Done!"


def test_simple_await(loop_scheduler):  # pylint: disable=redefined-outer-name
    result = loop_scheduler.await_(simple("Done!"))
    assert result == "Done!"


def test_async_context(loop_scheduler):  # pylint: disable=redefined-outer-name
    sequence = []

    @contextlib.asynccontextmanager
    async def do_():
        sequence.append("Entered")
        yield 10
        sequence.append("Exiting")

    with loop_scheduler.async_ctx(do_()) as value:
        assert value == 10

    assert sequence == ["Entered", "Exiting"]


def test_async_context_exception(
    loop_scheduler,
):  # pylint: disable=redefined-outer-name
    @contextlib.asynccontextmanager
    async def raises_before_yield():
        raise RuntimeError
        yield

    with pytest.raises(RuntimeError):
        with loop_scheduler.async_ctx(raises_before_yield()):
            pass

    @contextlib.asynccontextmanager
    async def raises_after_yield():
        yield
        raise RuntimeError

    with pytest.raises(RuntimeError):
        with loop_scheduler.async_ctx(raises_after_yield()):
            pass


def test_task_timeout():
    loop = asyncio.new_event_loop()
    loop.set_debug(True)

    # First check a normal (sub timeout) situation
    with LoopScheduler(loop=loop, timeout=0.1) as scheduler:
        # Make sure the sleep is bigger than our timeout
        scheduler.await_(asyncio.sleep(0.001))

    # Now one where we time out
    with pytest.raises(concurrent.futures.TimeoutError) as excinfo:
        with LoopScheduler(loop=loop, timeout=0.1) as scheduler:
            scheduler.await_(asyncio.sleep(1.0))
    assert asyncio.sleep.__name__ in str(excinfo.value)

    # Test supplying a custom name
    with pytest.raises(concurrent.futures.TimeoutError) as excinfo:
        with LoopScheduler(loop=loop, timeout=0.1) as scheduler:
            scheduler.await_(asyncio.sleep(1.0), name="sleepin'...zZzZ")
    assert "sleepin'...zZzZ" in str(excinfo.value)


def test_task_cancel(loop_scheduler):  # pylint: disable=redefined-outer-name
    evt = asyncio.Event()

    async def set_env():
        evt.set()

    loop_scheduler.await_submit(set_env()).result()
    assert evt.is_set()

    evt.clear()
    loop_scheduler.await_submit(set_env()).cancel()
    assert not evt.is_set()


def test_await_futures(loop_scheduler):  # pylint: disable=redefined-outer-name
    """Test that a series of Futures works correctly"""

    async def inception():
        fut = asyncio.Future()
        fut2 = asyncio.Future()
        fut3 = asyncio.Future()

        fut.set_result(True)
        fut2.set_result(fut)
        fut3.set_result(fut2)

        return fut3

    assert loop_scheduler.await_(inception()).result().result().result() is True


def test_await_ctx_futures(loop_scheduler):  # pylint: disable=redefined-outer-name
    """Test that an async context yielding a future is correctly handled i.e. the asyncio future
    should be converted to a concurrent one and the result be propagated back to the asyncio future
    in the context"""

    @contextlib.asynccontextmanager
    async def ctx():
        fut = asyncio.Future()
        try:
            yield fut
        finally:
            assert fut.result() is True

    with loop_scheduler.async_ctx(ctx()) as future:
        assert isinstance(future, concurrent.futures.Future)
        future.set_result(True)