1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
from __future__ import annotations
import asyncio
import logging
import signal
import sys
import warnings
from typing import Any
from gunicorn.arbiter import Arbiter
from gunicorn.workers.base import Worker
from uvicorn._compat import asyncio_run
from uvicorn.config import Config
from uvicorn.server import Server
warnings.warn(
"The `uvicorn.workers` module is deprecated. Please use `uvicorn-worker` package instead.\n"
"For more details, see https://github.com/Kludex/uvicorn-worker.",
DeprecationWarning,
)
class UvicornWorker(Worker):
"""
A worker class for Gunicorn that interfaces with an ASGI consumer callable,
rather than a WSGI callable.
"""
CONFIG_KWARGS: dict[str, Any] = {"loop": "auto", "http": "auto"}
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
logger = logging.getLogger("uvicorn.error")
logger.handlers = self.log.error_log.handlers
logger.setLevel(self.log.error_log.level)
logger.propagate = False
logger = logging.getLogger("uvicorn.access")
logger.handlers = self.log.access_log.handlers
logger.setLevel(self.log.access_log.level)
logger.propagate = False
config_kwargs: dict = {
"app": None,
"log_config": None,
"timeout_keep_alive": self.cfg.keepalive,
"timeout_notify": self.timeout,
"callback_notify": self.callback_notify,
"limit_max_requests": self.max_requests,
"forwarded_allow_ips": self.cfg.forwarded_allow_ips,
}
if self.cfg.is_ssl:
ssl_kwargs = {
"ssl_keyfile": self.cfg.ssl_options.get("keyfile"),
"ssl_certfile": self.cfg.ssl_options.get("certfile"),
"ssl_keyfile_password": self.cfg.ssl_options.get("password"),
"ssl_version": self.cfg.ssl_options.get("ssl_version"),
"ssl_cert_reqs": self.cfg.ssl_options.get("cert_reqs"),
"ssl_ca_certs": self.cfg.ssl_options.get("ca_certs"),
"ssl_ciphers": self.cfg.ssl_options.get("ciphers"),
}
config_kwargs.update(ssl_kwargs)
if self.cfg.settings["backlog"].value:
config_kwargs["backlog"] = self.cfg.settings["backlog"].value
config_kwargs.update(self.CONFIG_KWARGS)
self.config = Config(**config_kwargs)
def init_signals(self) -> None:
# Reset signals so Gunicorn doesn't swallow subprocess return codes
# other signals are set up by Server.install_signal_handlers()
# See: https://github.com/Kludex/uvicorn/issues/894
for s in self.SIGNALS:
signal.signal(s, signal.SIG_DFL)
signal.signal(signal.SIGUSR1, self.handle_usr1)
# Don't let SIGUSR1 disturb active requests by interrupting system calls
signal.siginterrupt(signal.SIGUSR1, False)
def _install_sigquit_handler(self) -> None:
"""Install a SIGQUIT handler on workers.
- https://github.com/Kludex/uvicorn/issues/1116
- https://github.com/benoitc/gunicorn/issues/2604
"""
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGQUIT, self.handle_exit, signal.SIGQUIT, None)
async def _serve(self) -> None:
self.config.app = self.wsgi
server = Server(config=self.config)
self._install_sigquit_handler()
await server.serve(sockets=self.sockets)
if not server.started:
sys.exit(Arbiter.WORKER_BOOT_ERROR)
def run(self) -> None:
return asyncio_run(self._serve(), loop_factory=self.config.get_loop_factory())
async def callback_notify(self) -> None:
self.notify()
class UvicornH11Worker(UvicornWorker):
CONFIG_KWARGS = {"loop": "asyncio", "http": "h11"}
|