1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
|
import pytest
from starlette.background import BackgroundTask, BackgroundTasks
from starlette.responses import Response
from starlette.types import Receive, Scope, Send
from tests.types import TestClientFactory
def test_async_task(test_client_factory: TestClientFactory) -> None:
TASK_COMPLETE = False
async def async_task() -> None:
nonlocal TASK_COMPLETE
TASK_COMPLETE = True
task = BackgroundTask(async_task)
async def app(scope: Scope, receive: Receive, send: Send) -> None:
response = Response("task initiated", media_type="text/plain", background=task)
await response(scope, receive, send)
client = test_client_factory(app)
response = client.get("/")
assert response.text == "task initiated"
assert TASK_COMPLETE
def test_sync_task(test_client_factory: TestClientFactory) -> None:
TASK_COMPLETE = False
def sync_task() -> None:
nonlocal TASK_COMPLETE
TASK_COMPLETE = True
task = BackgroundTask(sync_task)
async def app(scope: Scope, receive: Receive, send: Send) -> None:
response = Response("task initiated", media_type="text/plain", background=task)
await response(scope, receive, send)
client = test_client_factory(app)
response = client.get("/")
assert response.text == "task initiated"
assert TASK_COMPLETE
def test_multiple_tasks(test_client_factory: TestClientFactory) -> None:
TASK_COUNTER = 0
def increment(amount: int) -> None:
nonlocal TASK_COUNTER
TASK_COUNTER += amount
async def app(scope: Scope, receive: Receive, send: Send) -> None:
tasks = BackgroundTasks()
tasks.add_task(increment, amount=1)
tasks.add_task(increment, amount=2)
tasks.add_task(increment, amount=3)
response = Response("tasks initiated", media_type="text/plain", background=tasks)
await response(scope, receive, send)
client = test_client_factory(app)
response = client.get("/")
assert response.text == "tasks initiated"
assert TASK_COUNTER == 1 + 2 + 3
def test_multi_tasks_failure_avoids_next_execution(
test_client_factory: TestClientFactory,
) -> None:
TASK_COUNTER = 0
def increment() -> None:
nonlocal TASK_COUNTER
TASK_COUNTER += 1
if TASK_COUNTER == 1: # pragma: no branch
raise Exception("task failed")
async def app(scope: Scope, receive: Receive, send: Send) -> None:
tasks = BackgroundTasks()
tasks.add_task(increment)
tasks.add_task(increment)
response = Response("tasks initiated", media_type="text/plain", background=tasks)
await response(scope, receive, send)
client = test_client_factory(app)
with pytest.raises(Exception):
client.get("/")
assert TASK_COUNTER == 1
|