1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
|
import asyncio
from time import sleep
from typing import Callable, List, Tuple
import pytest
from textual import work
from textual._work_decorator import WorkerDeclarationError
from textual.app import App
from textual.worker import Worker, WorkerState, WorkType
class WorkApp(App):
worker: Worker
def __init__(self) -> None:
super().__init__()
self.states: list[WorkerState] = []
@work
async def async_work(self) -> str:
await asyncio.sleep(0.1)
return "foo"
@work(thread=True)
async def async_thread_work(self) -> str:
await asyncio.sleep(0.1)
return "foo"
@work(thread=True)
def thread_work(self) -> str:
sleep(0.1)
return "foo"
def launch(self, worker) -> None:
self.worker = worker()
def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
self.states.append(event.state)
async def work_with(launcher: Callable[[WorkApp], WorkType]) -> None:
"""Core code for testing a work decorator."""
app = WorkApp()
async with app.run_test() as pilot:
app.launch(launcher(app))
await app.workers.wait_for_complete()
result = await app.worker.wait()
assert result == "foo"
await pilot.pause()
assert app.states == [
WorkerState.PENDING,
WorkerState.RUNNING,
WorkerState.SUCCESS,
]
async def test_async_work() -> None:
"""It should be possible to decorate an async method as an async worker."""
await work_with(lambda app: app.async_work)
async def test_async_thread_work() -> None:
"""It should be possible to decorate an async method as a thread worker."""
await work_with(lambda app: app.async_thread_work)
async def test_thread_work() -> None:
"""It should be possible to decorate a non-async method as a thread worker."""
await work_with(lambda app: app.thread_work)
def test_decorate_non_async_no_thread_argument() -> None:
"""Decorating a non-async method without saying explicitly that it's a thread is an error."""
with pytest.raises(WorkerDeclarationError):
class _(App[None]):
@work
def foo(self) -> None:
pass
def test_decorate_non_async_no_thread_is_false() -> None:
"""Decorating a non-async method and saying it isn't a thread is an error."""
with pytest.raises(WorkerDeclarationError):
class _(App[None]):
@work(thread=False)
def foo(self) -> None:
pass
class NestedWorkersApp(App[None]):
def __init__(self, call_stack: List[str]):
self.call_stack = call_stack
super().__init__()
def call_from_stack(self):
if self.call_stack:
call_now = self.call_stack.pop()
getattr(self, call_now)()
@work(thread=False)
async def async_no_thread(self):
self.call_from_stack()
@work(thread=True)
async def async_thread(self):
self.call_from_stack()
@work(thread=True)
def thread(self):
self.call_from_stack()
@pytest.mark.parametrize(
"call_stack",
[ # from itertools import product; list(product("async_no_thread async_thread thread".split(), repeat=3))
("async_no_thread", "async_no_thread", "async_no_thread"),
("async_no_thread", "async_no_thread", "async_thread"),
("async_no_thread", "async_no_thread", "thread"),
("async_no_thread", "async_thread", "async_no_thread"),
("async_no_thread", "async_thread", "async_thread"),
("async_no_thread", "async_thread", "thread"),
("async_no_thread", "thread", "async_no_thread"),
("async_no_thread", "thread", "async_thread"),
("async_no_thread", "thread", "thread"),
("async_thread", "async_no_thread", "async_no_thread"),
("async_thread", "async_no_thread", "async_thread"),
("async_thread", "async_no_thread", "thread"),
("async_thread", "async_thread", "async_no_thread"),
("async_thread", "async_thread", "async_thread"),
("async_thread", "async_thread", "thread"),
("async_thread", "thread", "async_no_thread"),
("async_thread", "thread", "async_thread"),
("async_thread", "thread", "thread"),
("thread", "async_no_thread", "async_no_thread"),
("thread", "async_no_thread", "async_thread"),
("thread", "async_no_thread", "thread"),
("thread", "async_thread", "async_no_thread"),
("thread", "async_thread", "async_thread"),
("thread", "async_thread", "thread"),
("thread", "thread", "async_no_thread"),
("thread", "thread", "async_thread"),
("thread", "thread", "thread"),
( # Plus a longer chain to stress test this mechanism.
"async_no_thread",
"async_no_thread",
"thread",
"thread",
"async_thread",
"async_thread",
"async_no_thread",
"async_thread",
"async_no_thread",
"async_thread",
"thread",
"async_thread",
"async_thread",
"async_no_thread",
"async_no_thread",
"thread",
"thread",
"async_no_thread",
"async_no_thread",
"thread",
"async_no_thread",
"thread",
"thread",
),
],
)
async def test_calling_workers_from_within_workers(call_stack: Tuple[str]):
"""Regression test for https://github.com/Textualize/textual/issues/3472.
This makes sure we can nest worker calls without a problem.
"""
app = NestedWorkersApp(list(call_stack))
async with app.run_test():
app.call_from_stack()
# We need multiple awaits because we're creating a chain of workers that may
# have multiple async workers, each of which may need the await to have enough
# time to call the next one in the chain.
for _ in range(len(call_stack)):
await app.workers.wait_for_complete()
assert app.call_stack == []
|