File: test_asyncio.py

package info (click to toggle)
pyee 13.0.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 296 kB
  • sloc: python: 1,113; makefile: 4; sh: 1
file content (192 lines) | stat: -rw-r--r-- 4,358 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# -*- coding: utf-8 -*-

import asyncio
from asyncio import Future, get_running_loop, sleep, wait_for
from typing import NoReturn

import pytest
import pytest_asyncio.plugin  # noqa

try:
    from asyncio.exceptions import TimeoutError  # type: ignore
except ImportError:
    from concurrent.futures import TimeoutError  # type: ignore

from pyee.asyncio import AsyncIOEventEmitter


class PyeeTestError(Exception):
    pass


@pytest.mark.asyncio
async def test_emit() -> None:
    """Test that AsyncIOEventEmitter can handle wrapping
    coroutines
    """

    ee = AsyncIOEventEmitter(loop=get_running_loop())

    should_call: Future[bool] = Future(loop=get_running_loop())

    @ee.on("event")
    async def event_handler() -> None:
        should_call.set_result(True)

    ee.emit("event")

    result = await wait_for(should_call, 0.1)

    assert result is True

    await asyncio.sleep(0)
    assert ee.complete


@pytest.mark.asyncio
async def test_once_emit() -> None:
    """Test that AsyncIOEventEmitter also wrap coroutines when
    using once
    """

    ee = AsyncIOEventEmitter(loop=get_running_loop())

    should_call: Future[bool] = Future(loop=get_running_loop())

    @ee.once("event")
    async def event_handler():
        should_call.set_result(True)

    ee.emit("event")

    result = await wait_for(should_call, 0.1)

    assert result is True


@pytest.mark.asyncio
async def test_error() -> None:
    """Test that AsyncIOEventEmitter can handle errors when
    wrapping coroutines
    """
    ee = AsyncIOEventEmitter(loop=get_running_loop())

    should_call: Future[bool] = Future(loop=get_running_loop())

    @ee.on("event")
    async def event_handler() -> NoReturn:
        raise PyeeTestError()

    @ee.on("error")
    def handle_error(exc):
        should_call.set_result(exc)

    ee.emit("event")

    result = await wait_for(should_call, 0.1)

    assert isinstance(result, PyeeTestError)


@pytest.mark.asyncio
async def test_future_canceled() -> None:
    """Test that AsyncIOEventEmitter can handle canceled Futures"""

    cancel_me: Future[bool] = Future(loop=get_running_loop())
    should_not_call: Future[None] = Future(loop=get_running_loop())

    ee = AsyncIOEventEmitter(loop=get_running_loop())

    @ee.on("event")
    async def event_handler() -> None:
        cancel_me.cancel()

    @ee.on("error")
    def handle_error(exc) -> None:
        should_not_call.set_result(None)

    ee.emit("event")

    try:
        await wait_for(should_not_call, 0.1)
    except TimeoutError:
        pass
    else:
        raise PyeeTestError()


@pytest.mark.asyncio
async def test_event_emitter_canceled() -> None:
    """Test that all running handlers in AsyncIOEventEmitter can be canceled"""

    ee = AsyncIOEventEmitter(loop=get_running_loop())

    should_not_call: Future[bool] = Future(loop=get_running_loop())

    @ee.on("event")
    async def event_handler():
        await sleep(1)
        should_not_call.set_result(True)

    ee.emit("event")

    await sleep(0)

    # event_handler should still be running
    assert not ee.complete

    # cancel all pending tasks, including event_handler
    ee.cancel()

    await sleep(0)

    # event_handler should be canceled
    assert ee.complete


@pytest.mark.asyncio
async def test_wait_for_complete() -> None:
    """Test waiting for all pending tasks in an AsyncIOEventEmitter to
    complete
    """

    ee = AsyncIOEventEmitter(loop=get_running_loop())

    @ee.on("event")
    async def event_handler():
        await sleep(0.1)

    ee.emit("event")

    await sleep(0)

    # event_handler should still be running
    assert not ee.complete

    # wait for event_handler to complete execution
    await ee.wait_for_complete()

    # event_handler should have finished execution
    assert ee.complete


@pytest.mark.asyncio
async def test_sync_error() -> None:
    """Test that regular functions have the same error handling as coroutines"""
    ee = AsyncIOEventEmitter(loop=get_running_loop())

    should_call: Future[Exception] = Future(loop=get_running_loop())

    @ee.on("event")
    def sync_handler():
        raise PyeeTestError()

    @ee.on("error")
    def handle_error(exc):
        should_call.set_result(exc)

    ee.emit("event")

    result = await wait_for(should_call, 0.1)

    assert isinstance(result, PyeeTestError)