1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
|
from __future__ import annotations
import asyncio
from functools import partial
from typing import Any, Callable
from ..config import Config
from ..typing import AppWrapper, ASGIReceiveEvent, ASGISendEvent, LifespanScope
from ..utils import LifespanFailureError, LifespanTimeoutError
class UnexpectedMessageError(Exception):
pass
class Lifespan:
def __init__(self, app: AppWrapper, config: Config, loop: asyncio.AbstractEventLoop) -> None:
self.app = app
self.config = config
self.startup = asyncio.Event()
self.shutdown = asyncio.Event()
self.app_queue: asyncio.Queue = asyncio.Queue(config.max_app_queue_size)
self.supported = True
self.loop = loop
# This mimics the Trio nursery.start task_status and is
# required to ensure the support has been checked before
# waiting on timeouts.
self._started = asyncio.Event()
async def handle_lifespan(self) -> None:
self._started.set()
scope: LifespanScope = {
"type": "lifespan",
"asgi": {"spec_version": "2.0", "version": "3.0"},
}
def _call_soon(func: Callable, *args: Any) -> Any:
future = asyncio.run_coroutine_threadsafe(func(*args), self.loop)
return future.result()
try:
await self.app(
scope,
self.asgi_receive,
self.asgi_send,
partial(self.loop.run_in_executor, None),
_call_soon,
)
except LifespanFailureError:
# Lifespan failures should crash the server
raise
except Exception:
self.supported = False
if not self.startup.is_set():
await self.config.log.warning(
"ASGI Framework Lifespan error, continuing without Lifespan support"
)
elif not self.shutdown.is_set():
await self.config.log.exception(
"ASGI Framework Lifespan error, shutdown without Lifespan support"
)
else:
await self.config.log.exception("ASGI Framework Lifespan errored after shutdown.")
finally:
self.startup.set()
self.shutdown.set()
async def wait_for_startup(self) -> None:
await self._started.wait()
if not self.supported:
return
await self.app_queue.put({"type": "lifespan.startup"})
try:
await asyncio.wait_for(self.startup.wait(), timeout=self.config.startup_timeout)
except asyncio.TimeoutError as error:
raise LifespanTimeoutError("startup") from error
async def wait_for_shutdown(self) -> None:
await self._started.wait()
if not self.supported:
return
await self.app_queue.put({"type": "lifespan.shutdown"})
try:
await asyncio.wait_for(self.shutdown.wait(), timeout=self.config.shutdown_timeout)
except asyncio.TimeoutError as error:
raise LifespanTimeoutError("shutdown") from error
async def asgi_receive(self) -> ASGIReceiveEvent:
return await self.app_queue.get()
async def asgi_send(self, message: ASGISendEvent) -> None:
if message["type"] == "lifespan.startup.complete":
self.startup.set()
elif message["type"] == "lifespan.shutdown.complete":
self.shutdown.set()
elif message["type"] == "lifespan.startup.failed":
self.startup.set()
raise LifespanFailureError("startup", message.get("message", ""))
elif message["type"] == "lifespan.shutdown.failed":
self.shutdown.set()
raise LifespanFailureError("shutdown", message.get("message", ""))
else:
raise UnexpectedMessageError(message["type"])
|