1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
|
import asyncio
import pytest
from textual.app import App
from textual.worker import (
DeadlockError,
NoActiveWorker,
Worker,
WorkerCancelled,
WorkerError,
WorkerFailed,
WorkerState,
get_current_worker,
)
async def test_initialize():
"""Test initial values."""
def foo() -> str:
return "foo"
app = App()
async with app.run_test():
worker = Worker(app, foo, name="foo", group="foo-group", description="Foo test")
repr(worker)
assert worker.state == WorkerState.PENDING
assert not worker.is_cancelled
assert not worker.is_running
assert not worker.is_finished
assert worker.completed_steps == 0
assert worker.total_steps is None
assert worker.progress == 0.0
assert worker.result is None
async def test_run_success() -> None:
"""Test successful runs."""
def foo() -> str:
"""Regular function."""
return "foo"
async def bar() -> str:
"""Coroutine."""
return "bar"
async def baz() -> str:
"""Coroutine."""
return "baz"
class RunApp(App):
pass
app = RunApp()
async with app.run_test():
# Call regular function
foo_worker: Worker[str] = Worker(
app, foo, name="foo", group="foo-group", description="Foo test", thread=True
)
# Call coroutine function
bar_worker: Worker[str] = Worker(
app, bar, name="bar", group="bar-group", description="Bar test"
)
# Call coroutine
baz_worker: Worker[str] = Worker(
app, baz(), name="baz", group="baz-group", description="Baz test"
)
# Call coroutine function in a thread
bar_thread_worker: Worker[str] = Worker(
app, bar, name="bar", group="bar-group", description="Bar test", thread=True
)
# Call coroutine in a thread.
baz_thread_worker: Worker[str] = Worker(
app,
baz(),
name="baz",
group="baz-group",
description="Baz test",
thread=True,
)
assert foo_worker.result is None
assert bar_worker.result is None
assert baz_worker.result is None
assert bar_thread_worker.result is None
assert baz_thread_worker.result is None
foo_worker._start(app)
bar_worker._start(app)
baz_worker._start(app)
bar_thread_worker._start(app)
baz_thread_worker._start(app)
assert await foo_worker.wait() == "foo"
assert await bar_worker.wait() == "bar"
assert await baz_worker.wait() == "baz"
assert await bar_thread_worker.wait() == "bar"
assert await baz_thread_worker.wait() == "baz"
assert foo_worker.result == "foo"
assert bar_worker.result == "bar"
assert baz_worker.result == "baz"
assert bar_thread_worker.result == "bar"
assert baz_thread_worker.result == "baz"
async def test_run_error() -> None:
async def run_error() -> str:
await asyncio.sleep(0.1)
1 / 0
return "Never"
class ErrorApp(App):
pass
app = ErrorApp()
with pytest.raises(WorkerFailed):
async with app.run_test():
worker: Worker[str] = Worker(app, run_error)
worker._start(app)
await worker.wait()
async def test_run_cancel() -> None:
"""Test run may be cancelled."""
async def run_error() -> str:
await asyncio.sleep(0.1)
return "Never"
class ErrorApp(App):
pass
app = ErrorApp()
async with app.run_test():
worker: Worker[str] = Worker(app, run_error)
worker._start(app)
await asyncio.sleep(0)
worker.cancel()
assert worker.is_cancelled
with pytest.raises(WorkerCancelled):
await worker.wait()
async def test_run_cancel_immediately() -> None:
"""Edge case for cancelling immediately."""
async def run_error() -> str:
await asyncio.sleep(0.1)
return "Never"
class ErrorApp(App):
pass
app = ErrorApp()
async with app.run_test():
worker: Worker[str] = Worker(app, run_error)
worker._start(app)
worker.cancel()
assert worker.is_cancelled
with pytest.raises(WorkerCancelled):
await worker.wait()
async def test_get_worker() -> None:
"""Check get current worker."""
async def run_worker() -> Worker:
worker = get_current_worker()
return worker
class WorkerApp(App):
pass
app = WorkerApp()
async with app.run_test():
worker: Worker[Worker] = Worker(app, run_worker)
worker._start(app)
assert await worker.wait() is worker
def test_no_active_worker() -> None:
"""No active worker raises a specific exception."""
with pytest.raises(NoActiveWorker):
get_current_worker()
async def test_progress_update():
async def long_work():
pass
app = App()
async with app.run_test():
worker = Worker(app, long_work)
worker._start(app)
worker.update(total_steps=100)
assert worker.progress == 0
worker.advance(50)
assert worker.progress == 50
worker.update(completed_steps=23)
assert worker.progress == 73
async def test_double_start():
async def long_work():
return 0
app = App()
async with app.run_test():
worker = Worker(app, long_work)
worker._start(app)
worker._start(app)
assert await worker.wait() == 0
async def test_self_referential_deadlock():
async def self_referential_work():
await get_current_worker().wait()
app = App()
with pytest.raises(WorkerFailed) as exc:
async with app.run_test():
worker = Worker(app, self_referential_work)
worker._start(app)
await worker.wait()
assert exc.type is DeadlockError
async def test_wait_without_start():
async def work():
return
app = App()
async with app.run_test():
worker = Worker(app, work)
with pytest.raises(WorkerError):
await worker.wait()
|