1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
|
# This has to be done first as Twisted is import-order-sensitive with reactors
import asyncio # isort:skip
import os # isort:skip
import sys # isort:skip
import warnings # isort:skip
from concurrent.futures import ThreadPoolExecutor # isort:skip
from twisted.internet import asyncioreactor # isort:skip
twisted_loop = asyncio.new_event_loop()
if "ASGI_THREADS" in os.environ:
twisted_loop.set_default_executor(
ThreadPoolExecutor(max_workers=int(os.environ["ASGI_THREADS"]))
)
current_reactor = sys.modules.get("twisted.internet.reactor", None)
if current_reactor is not None:
if not isinstance(current_reactor, asyncioreactor.AsyncioSelectorReactor):
warnings.warn(
"Something has already installed a non-asyncio Twisted reactor. Attempting to uninstall it; "
+ "you can fix this warning by importing daphne.server early in your codebase or "
+ "finding the package that imports Twisted and importing it later on.",
UserWarning,
stacklevel=2,
)
del sys.modules["twisted.internet.reactor"]
asyncioreactor.install(twisted_loop)
else:
asyncioreactor.install(twisted_loop)
import logging
import time
from concurrent.futures import CancelledError
from functools import partial
from twisted.internet import defer, reactor
from twisted.internet.endpoints import serverFromString
from twisted.logger import STDLibLogObserver, globalLogBeginner
from twisted.web import http
from .http_protocol import HTTPFactory
from .ws_protocol import WebSocketFactory
logger = logging.getLogger(__name__)
class Server:
def __init__(
self,
application,
endpoints=None,
signal_handlers=True,
action_logger=None,
http_timeout=None,
request_buffer_size=8192,
websocket_timeout=86400,
websocket_connect_timeout=20,
ping_interval=20,
ping_timeout=30,
root_path="",
proxy_forwarded_address_header=None,
proxy_forwarded_port_header=None,
proxy_forwarded_proto_header=None,
verbosity=1,
websocket_handshake_timeout=5,
application_close_timeout=10,
ready_callable=None,
server_name="daphne",
):
self.application = application
self.endpoints = endpoints or []
self.listeners = []
self.listening_addresses = []
self.signal_handlers = signal_handlers
self.action_logger = action_logger
self.http_timeout = http_timeout
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.request_buffer_size = request_buffer_size
self.proxy_forwarded_address_header = proxy_forwarded_address_header
self.proxy_forwarded_port_header = proxy_forwarded_port_header
self.proxy_forwarded_proto_header = proxy_forwarded_proto_header
self.websocket_timeout = websocket_timeout
self.websocket_connect_timeout = websocket_connect_timeout
self.websocket_handshake_timeout = websocket_handshake_timeout
self.application_close_timeout = application_close_timeout
self.root_path = root_path
self.verbosity = verbosity
self.abort_start = False
self.ready_callable = ready_callable
self.server_name = server_name
# Check our construction is actually sensible
if not self.endpoints:
logger.error("No endpoints. This server will not listen on anything.")
sys.exit(1)
def run(self):
# A dict of protocol: {"application_instance":, "connected":, "disconnected":} dicts
self.connections = {}
# Make the factory
self.http_factory = HTTPFactory(self)
self.ws_factory = WebSocketFactory(self, server=self.server_name)
self.ws_factory.setProtocolOptions(
autoPingTimeout=self.ping_timeout,
allowNullOrigin=True,
openHandshakeTimeout=self.websocket_handshake_timeout,
)
if self.verbosity <= 1:
# Redirect the Twisted log to nowhere
globalLogBeginner.beginLoggingTo(
[lambda _: None], redirectStandardIO=False, discardBuffer=True
)
else:
globalLogBeginner.beginLoggingTo([STDLibLogObserver(__name__)])
# Detect what Twisted features are enabled
if http.H2_ENABLED:
logger.info("HTTP/2 support enabled")
else:
logger.info(
"HTTP/2 support not enabled (install the http2 and tls Twisted extras)"
)
# Kick off the timeout loop
reactor.callLater(1, self.application_checker)
reactor.callLater(2, self.timeout_checker)
for socket_description in self.endpoints:
logger.info("Configuring endpoint %s", socket_description)
ep = serverFromString(reactor, str(socket_description))
listener = ep.listen(self.http_factory)
listener.addCallback(self.listen_success)
listener.addErrback(self.listen_error)
self.listeners.append(listener)
# Set the asyncio reactor's event loop as global
# TODO: Should we instead pass the global one into the reactor?
asyncio.set_event_loop(reactor._asyncioEventloop)
# Verbosity 3 turns on asyncio debug to find those blocking yields
if self.verbosity >= 3:
asyncio.get_event_loop().set_debug(True)
reactor.addSystemEventTrigger("before", "shutdown", self.kill_all_applications)
if not self.abort_start:
# Trigger the ready flag if we had one
if self.ready_callable:
self.ready_callable()
# Run the reactor
reactor.run(installSignalHandlers=self.signal_handlers)
def listen_success(self, port):
"""
Called when a listen succeeds so we can store port details (if there are any)
"""
if hasattr(port, "getHost"):
host = port.getHost()
if hasattr(host, "host") and hasattr(host, "port"):
self.listening_addresses.append((host.host, host.port))
logger.info(
"Listening on TCP address %s:%s",
port.getHost().host,
port.getHost().port,
)
def listen_error(self, failure):
logger.critical("Listen failure: %s", failure.getErrorMessage())
self.stop()
def stop(self):
"""
Force-stops the server.
"""
if reactor.running:
reactor.stop()
else:
self.abort_start = True
### Protocol handling
def protocol_connected(self, protocol):
"""
Adds a protocol as a current connection.
"""
if protocol in self.connections:
raise RuntimeError("Protocol %r was added to main list twice!" % protocol)
self.connections[protocol] = {"connected": time.time()}
def protocol_disconnected(self, protocol):
# Set its disconnected time (the loops will come and clean it up)
# Do not set it if it is already set. Overwriting it might
# cause it to never be cleaned up.
# See https://github.com/django/channels/issues/1181
if "disconnected" not in self.connections[protocol]:
self.connections[protocol]["disconnected"] = time.time()
### Internal event/message handling
def create_application(self, protocol, scope):
"""
Creates a new application instance that fronts a Protocol instance
for one of our supported protocols. Pass it the protocol,
and it will work out the type, supply appropriate callables, and
return you the application's input queue
"""
# Make sure the protocol has not had another application made for it
assert "application_instance" not in self.connections[protocol]
# Make an instance of the application
input_queue = asyncio.Queue()
scope.setdefault("asgi", {"version": "3.0"})
application_instance = self.application(
scope=scope,
receive=input_queue.get,
send=partial(self.handle_reply, protocol),
)
# Run it, and stash the future for later checking
if protocol not in self.connections:
return None
self.connections[protocol]["application_instance"] = asyncio.ensure_future(
application_instance,
loop=asyncio.get_event_loop(),
)
return input_queue
async def handle_reply(self, protocol, message):
"""
Coroutine that jumps the reply message from asyncio to Twisted
"""
# Don't do anything if the connection is closed or does not exist
if protocol not in self.connections or self.connections[protocol].get(
"disconnected", None
):
return
try:
self.check_headers_type(message)
except ValueError:
# Ensure to send SOME reply.
protocol.basic_error(500, b"Server Error", "Server Error")
raise
# Let the protocol handle it
protocol.handle_reply(message)
@staticmethod
def check_headers_type(message):
if not message["type"] == "http.response.start":
return
for k, v in message.get("headers", []):
if not isinstance(k, bytes):
raise ValueError(
"Header name '{}' expected to be `bytes`, but got `{}`".format(
k, type(k)
)
)
if not isinstance(v, bytes):
raise ValueError(
"Header value '{}' expected to be `bytes`, but got `{}`".format(
v, type(v)
)
)
### Utility
def application_checker(self):
"""
Goes through the set of current application Futures and cleans up
any that are done/prints exceptions for any that errored.
"""
for protocol, details in list(self.connections.items()):
disconnected = details.get("disconnected", None)
application_instance = details.get("application_instance", None)
# First, see if the protocol disconnected and the app has taken
# too long to close up
if (
disconnected
and time.time() - disconnected > self.application_close_timeout
):
if application_instance and not application_instance.done():
logger.warning(
"Application instance %r for connection %s took too long to shut down and was killed.",
application_instance,
repr(protocol),
)
application_instance.cancel()
# Then see if the app is done and we should reap it
if application_instance and application_instance.done():
try:
exception = application_instance.exception()
except (CancelledError, asyncio.CancelledError):
# Future cancellation. We can ignore this.
pass
else:
if exception:
if isinstance(exception, KeyboardInterrupt):
# Protocol is asking the server to exit (likely during test)
self.stop()
else:
logger.error(
"Exception inside application: %s",
exception,
exc_info=exception,
)
if not disconnected:
protocol.handle_exception(exception)
del self.connections[protocol]["application_instance"]
application_instance = None
# Check to see if protocol is closed and app is closed so we can remove it
if not application_instance and disconnected:
del self.connections[protocol]
reactor.callLater(1, self.application_checker)
def kill_all_applications(self):
"""
Kills all application coroutines before reactor exit.
"""
# Send cancel to all coroutines
wait_for = []
for details in self.connections.values():
application_instance = details["application_instance"]
if not application_instance.done():
application_instance.cancel()
wait_for.append(application_instance)
logger.info("Killed %i pending application instances", len(wait_for))
# Make Twisted wait until they're all dead
wait_deferred = defer.Deferred.fromFuture(asyncio.gather(*wait_for))
wait_deferred.addErrback(lambda x: None)
return wait_deferred
def timeout_checker(self):
"""
Called periodically to enforce timeout rules on all connections.
Also checks pings at the same time.
"""
for protocol in list(self.connections.keys()):
protocol.check_timeouts()
reactor.callLater(2, self.timeout_checker)
def log_action(self, protocol, action, details):
"""
Dispatches to any registered action logger, if there is one.
"""
if self.action_logger:
self.action_logger(protocol, action, details)
|