1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
|
# -*- coding: utf-8 -*-
import asyncio
from asyncio import Future, get_running_loop, sleep, wait_for
from typing import NoReturn
import pytest
import pytest_asyncio.plugin # noqa
try:
from asyncio.exceptions import TimeoutError # type: ignore
except ImportError:
from concurrent.futures import TimeoutError # type: ignore
from pyee.asyncio import AsyncIOEventEmitter
class PyeeTestError(Exception):
pass
@pytest.mark.asyncio
async def test_emit() -> None:
"""Test that AsyncIOEventEmitter can handle wrapping
coroutines
"""
ee = AsyncIOEventEmitter(loop=get_running_loop())
should_call: Future[bool] = Future(loop=get_running_loop())
@ee.on("event")
async def event_handler() -> None:
should_call.set_result(True)
ee.emit("event")
result = await wait_for(should_call, 0.1)
assert result is True
await asyncio.sleep(0)
assert ee.complete
@pytest.mark.asyncio
async def test_once_emit() -> None:
"""Test that AsyncIOEventEmitter also wrap coroutines when
using once
"""
ee = AsyncIOEventEmitter(loop=get_running_loop())
should_call: Future[bool] = Future(loop=get_running_loop())
@ee.once("event")
async def event_handler():
should_call.set_result(True)
ee.emit("event")
result = await wait_for(should_call, 0.1)
assert result is True
@pytest.mark.asyncio
async def test_error() -> None:
"""Test that AsyncIOEventEmitter can handle errors when
wrapping coroutines
"""
ee = AsyncIOEventEmitter(loop=get_running_loop())
should_call: Future[bool] = Future(loop=get_running_loop())
@ee.on("event")
async def event_handler() -> NoReturn:
raise PyeeTestError()
@ee.on("error")
def handle_error(exc):
should_call.set_result(exc)
ee.emit("event")
result = await wait_for(should_call, 0.1)
assert isinstance(result, PyeeTestError)
@pytest.mark.asyncio
async def test_future_canceled() -> None:
"""Test that AsyncIOEventEmitter can handle canceled Futures"""
cancel_me: Future[bool] = Future(loop=get_running_loop())
should_not_call: Future[None] = Future(loop=get_running_loop())
ee = AsyncIOEventEmitter(loop=get_running_loop())
@ee.on("event")
async def event_handler() -> None:
cancel_me.cancel()
@ee.on("error")
def handle_error(exc) -> None:
should_not_call.set_result(None)
ee.emit("event")
try:
await wait_for(should_not_call, 0.1)
except TimeoutError:
pass
else:
raise PyeeTestError()
@pytest.mark.asyncio
async def test_event_emitter_canceled() -> None:
"""Test that all running handlers in AsyncIOEventEmitter can be canceled"""
ee = AsyncIOEventEmitter(loop=get_running_loop())
should_not_call: Future[bool] = Future(loop=get_running_loop())
@ee.on("event")
async def event_handler():
await sleep(1)
should_not_call.set_result(True)
ee.emit("event")
await sleep(0)
# event_handler should still be running
assert not ee.complete
# cancel all pending tasks, including event_handler
ee.cancel()
await sleep(0)
# event_handler should be canceled
assert ee.complete
@pytest.mark.asyncio
async def test_wait_for_complete() -> None:
"""Test waiting for all pending tasks in an AsyncIOEventEmitter to
complete
"""
ee = AsyncIOEventEmitter(loop=get_running_loop())
@ee.on("event")
async def event_handler():
await sleep(0.1)
ee.emit("event")
await sleep(0)
# event_handler should still be running
assert not ee.complete
# wait for event_handler to complete execution
await ee.wait_for_complete()
# event_handler should have finished execution
assert ee.complete
@pytest.mark.asyncio
async def test_sync_error() -> None:
"""Test that regular functions have the same error handling as coroutines"""
ee = AsyncIOEventEmitter(loop=get_running_loop())
should_call: Future[Exception] = Future(loop=get_running_loop())
@ee.on("event")
def sync_handler():
raise PyeeTestError()
@ee.on("error")
def handle_error(exc):
should_call.set_result(exc)
ee.emit("event")
result = await wait_for(should_call, 0.1)
assert isinstance(result, PyeeTestError)
|