1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
|
from __future__ import annotations
from enum import auto, Enum
from io import BytesIO, StringIO
from time import time
from typing import Any, Awaitable, Callable, Iterable, List, Optional, Tuple, Union
from urllib.parse import unquote
from wsproto.connection import Connection, ConnectionState, ConnectionType
from wsproto.events import (
BytesMessage,
CloseConnection,
Event as WSProtoEvent,
Message,
Ping,
TextMessage,
)
from wsproto.extensions import Extension, PerMessageDeflate
from wsproto.frame_protocol import CloseReason
from wsproto.handshake import server_extensions_handshake, WEBSOCKET_VERSION
from wsproto.utilities import generate_accept_token, split_comma_header
from .events import Body, Data, EndBody, EndData, Event, Request, Response, StreamClosed
from ..config import Config
from ..typing import (
AppWrapper,
ASGISendEvent,
TaskGroup,
WebsocketAcceptEvent,
WebsocketResponseBodyEvent,
WebsocketResponseStartEvent,
WebsocketScope,
WorkerContext,
)
from ..utils import (
build_and_validate_headers,
suppress_body,
UnexpectedMessageError,
valid_server_name,
)
class ASGIWebsocketState(Enum):
# Hypercorn supports the ASGI websocket HTTP response extension,
# which allows HTTP responses rather than acceptance.
HANDSHAKE = auto()
CONNECTED = auto()
RESPONSE = auto()
CLOSED = auto()
HTTPCLOSED = auto()
class FrameTooLargeError(Exception):
pass
class Handshake:
def __init__(self, headers: List[Tuple[bytes, bytes]], http_version: str) -> None:
self.http_version = http_version
self.connection_tokens: Optional[List[str]] = None
self.extensions: Optional[List[str]] = None
self.key: Optional[bytes] = None
self.subprotocols: Optional[List[str]] = None
self.upgrade: Optional[bytes] = None
self.version: Optional[bytes] = None
for name, value in headers:
name = name.lower()
if name == b"connection":
self.connection_tokens = split_comma_header(value)
elif name == b"sec-websocket-extensions":
self.extensions = split_comma_header(value)
elif name == b"sec-websocket-key":
self.key = value
elif name == b"sec-websocket-protocol":
self.subprotocols = split_comma_header(value)
elif name == b"sec-websocket-version":
self.version = value
elif name == b"upgrade":
self.upgrade = value
def is_valid(self) -> bool:
if self.http_version < "1.1":
return False
elif self.http_version == "1.1":
if self.key is None:
return False
if self.connection_tokens is None or not any(
token.lower() == "upgrade" for token in self.connection_tokens
):
return False
if self.upgrade.lower() != b"websocket":
return False
if self.version != WEBSOCKET_VERSION:
return False
return True
def accept(
self,
subprotocol: Optional[str],
additional_headers: Iterable[Tuple[bytes, bytes]],
) -> Tuple[int, List[Tuple[bytes, bytes]], Connection]:
headers = []
if subprotocol is not None:
if self.subprotocols is None or subprotocol not in self.subprotocols:
raise Exception("Invalid Subprotocol")
else:
headers.append((b"sec-websocket-protocol", subprotocol.encode()))
extensions: List[Extension] = [PerMessageDeflate()]
accepts = None
if self.extensions is not None:
accepts = server_extensions_handshake(self.extensions, extensions)
if accepts:
headers.append((b"sec-websocket-extensions", accepts))
if self.key is not None:
headers.append((b"sec-websocket-accept", generate_accept_token(self.key)))
status_code = 200
if self.http_version == "1.1":
headers.extend([(b"upgrade", b"WebSocket"), (b"connection", b"Upgrade")])
status_code = 101
for name, value in additional_headers:
if b"sec-websocket-protocol" == name or name.startswith(b":"):
raise Exception(f"Invalid additional header, {name.decode()}")
headers.append((name, value))
return status_code, headers, Connection(ConnectionType.SERVER, extensions)
class WebsocketBuffer:
def __init__(self, max_length: int) -> None:
self.value: Optional[Union[BytesIO, StringIO]] = None
self.length = 0
self.max_length = max_length
def extend(self, event: Message) -> None:
if self.value is None:
if isinstance(event, TextMessage):
self.value = StringIO()
else:
self.value = BytesIO()
self.length += self.value.write(event.data)
if self.length > self.max_length:
raise FrameTooLargeError()
def clear(self) -> None:
self.value = None
self.length = 0
def to_message(self) -> dict:
return {
"type": "websocket.receive",
"bytes": self.value.getvalue() if isinstance(self.value, BytesIO) else None,
"text": self.value.getvalue() if isinstance(self.value, StringIO) else None,
}
class WSStream:
def __init__(
self,
app: AppWrapper,
config: Config,
context: WorkerContext,
task_group: TaskGroup,
tls: Optional[dict[str, Any]],
client: Optional[Tuple[str, int]],
server: Optional[Tuple[str, int]],
send: Callable[[Event], Awaitable[None]],
stream_id: int,
) -> None:
self.app = app
self.app_put: Optional[Callable] = None
self.buffer = WebsocketBuffer(config.websocket_max_message_size)
self.client = client
self.closed = False
self.config = config
self.context = context
self.task_group = task_group
self.response: WebsocketResponseStartEvent
self.scope: WebsocketScope
self.send = send
# RFC 8441 for HTTP/2 says use http or https, ASGI says ws or wss
self.scheme = "wss" if tls is not None else "ws"
self.server = server
self.start_time: float
self.state = ASGIWebsocketState.HANDSHAKE
self.stream_id = stream_id
self.connection: Connection
self.handshake: Handshake
@property
def idle(self) -> bool:
return self.state in {ASGIWebsocketState.CLOSED, ASGIWebsocketState.HTTPCLOSED}
async def handle(self, event: Event) -> None:
if self.closed:
return
elif isinstance(event, Request):
self.start_time = time()
self.handshake = Handshake(event.headers, event.http_version)
path, _, query_string = event.raw_path.partition(b"?")
self.scope = {
"type": "websocket",
"asgi": {"spec_version": "2.3", "version": "3.0"},
"scheme": self.scheme,
"http_version": event.http_version,
"path": unquote(path.decode("ascii")),
"raw_path": path,
"query_string": query_string,
"root_path": self.config.root_path,
"headers": event.headers,
"client": self.client,
"server": self.server,
"subprotocols": self.handshake.subprotocols or [],
"extensions": {"websocket.http.response": {}},
}
if not valid_server_name(self.config, event):
await self._send_error_response(404)
self.closed = True
elif not self.handshake.is_valid():
await self._send_error_response(400)
self.closed = True
else:
self.app_put = await self.task_group.spawn_app(
self.app, self.config, self.scope, self.app_send
)
await self.app_put({"type": "websocket.connect"})
elif isinstance(event, (Body, Data)):
self.connection.receive_data(event.data)
await self._handle_events()
elif isinstance(event, StreamClosed):
self.closed = True
if self.app_put is not None:
if self.state in {ASGIWebsocketState.HTTPCLOSED, ASGIWebsocketState.CLOSED}:
code = CloseReason.NORMAL_CLOSURE.value
else:
code = CloseReason.ABNORMAL_CLOSURE.value
await self.app_put({"type": "websocket.disconnect", "code": code})
async def app_send(self, message: Optional[ASGISendEvent]) -> None:
if self.closed:
# Allow app to finish after close
return
if message is None: # ASGI App has finished sending messages
# Cleanup if required
if self.state == ASGIWebsocketState.HANDSHAKE:
await self._send_error_response(500)
await self.config.log.access(
self.scope, {"status": 500, "headers": []}, time() - self.start_time
)
elif self.state == ASGIWebsocketState.CONNECTED:
await self._send_wsproto_event(CloseConnection(code=CloseReason.INTERNAL_ERROR))
await self.send(StreamClosed(stream_id=self.stream_id))
else:
if message["type"] == "websocket.accept" and self.state == ASGIWebsocketState.HANDSHAKE:
await self._accept(message)
elif (
message["type"] == "websocket.http.response.start"
and self.state == ASGIWebsocketState.HANDSHAKE
):
self.response = message
elif message["type"] == "websocket.http.response.body" and self.state in {
ASGIWebsocketState.HANDSHAKE,
ASGIWebsocketState.RESPONSE,
}:
await self._send_rejection(message)
elif message["type"] == "websocket.send" and self.state == ASGIWebsocketState.CONNECTED:
event: WSProtoEvent
if message.get("bytes") is not None:
event = BytesMessage(data=bytes(message["bytes"]))
elif not isinstance(message["text"], str):
raise TypeError(f"{message['text']} should be a str")
else:
event = TextMessage(data=message["text"])
await self._send_wsproto_event(event)
elif (
message["type"] == "websocket.close" and self.state == ASGIWebsocketState.HANDSHAKE
):
self.state = ASGIWebsocketState.HTTPCLOSED
await self._send_error_response(403)
elif message["type"] == "websocket.close":
self.state = ASGIWebsocketState.CLOSED
await self._send_wsproto_event(
CloseConnection(
code=int(message.get("code", CloseReason.NORMAL_CLOSURE)),
reason=message.get("reason"),
)
)
await self.send(EndData(stream_id=self.stream_id))
else:
raise UnexpectedMessageError(self.state, message["type"])
async def _handle_events(self) -> None:
for event in self.connection.events():
if isinstance(event, Message):
try:
self.buffer.extend(event)
except FrameTooLargeError:
await self._send_wsproto_event(
CloseConnection(code=CloseReason.MESSAGE_TOO_BIG)
)
break
if event.message_finished:
await self.app_put(self.buffer.to_message())
self.buffer.clear()
elif isinstance(event, Ping):
await self._send_wsproto_event(event.response())
elif isinstance(event, CloseConnection):
if self.connection.state == ConnectionState.REMOTE_CLOSING:
await self._send_wsproto_event(event.response())
await self.send(StreamClosed(stream_id=self.stream_id))
async def _send_error_response(self, status_code: int) -> None:
await self.send(
Response(
stream_id=self.stream_id,
status_code=status_code,
headers=[(b"content-length", b"0"), (b"connection", b"close")],
)
)
await self.send(EndBody(stream_id=self.stream_id))
await self.config.log.access(
self.scope, {"status": status_code, "headers": []}, time() - self.start_time
)
async def _send_wsproto_event(self, event: WSProtoEvent) -> None:
data = self.connection.send(event)
await self.send(Data(stream_id=self.stream_id, data=data))
async def _accept(self, message: WebsocketAcceptEvent) -> None:
self.state = ASGIWebsocketState.CONNECTED
status_code, headers, self.connection = self.handshake.accept(
message.get("subprotocol"), message.get("headers", [])
)
await self.send(
Response(stream_id=self.stream_id, status_code=status_code, headers=headers)
)
await self.config.log.access(
self.scope, {"status": status_code, "headers": []}, time() - self.start_time
)
if self.config.websocket_ping_interval is not None:
self.task_group.spawn(self._send_pings)
async def _send_rejection(self, message: WebsocketResponseBodyEvent) -> None:
body_suppressed = suppress_body("GET", self.response["status"])
if self.state == ASGIWebsocketState.HANDSHAKE:
headers = build_and_validate_headers(self.response["headers"])
await self.send(
Response(
stream_id=self.stream_id,
status_code=int(self.response["status"]),
headers=headers,
)
)
self.state = ASGIWebsocketState.RESPONSE
if not body_suppressed:
await self.send(Body(stream_id=self.stream_id, data=bytes(message.get("body", b""))))
if not message.get("more_body", False):
self.state = ASGIWebsocketState.HTTPCLOSED
await self.send(EndBody(stream_id=self.stream_id))
await self.config.log.access(self.scope, self.response, time() - self.start_time)
async def _send_pings(self) -> None:
while not self.closed:
await self._send_wsproto_event(Ping())
await self.context.sleep(self.config.websocket_ping_interval)
|