1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
|
import http
import socket
import sys
import unittest
from unittest.mock import patch
from websockets.asyncio.client import connect, unix_connect
from websockets.asyncio.router import *
from websockets.exceptions import InvalidStatus
from ..utils import CLIENT_CONTEXT, SERVER_CONTEXT, temp_unix_socket_path
from .server import EvalShellMixin, get_uri, handler
from .utils import alist
try:
from werkzeug.routing import Map, Rule
except ImportError:
pass
async def echo(websocket, count):
message = await websocket.recv()
for _ in range(count):
await websocket.send(message)
@unittest.skipUnless("werkzeug" in sys.modules, "werkzeug not installed")
class RouterTests(EvalShellMixin, unittest.IsolatedAsyncioTestCase):
# This is a small realistic example of werkzeug's basic URL routing
# features: path matching, parameter extraction, and default values.
async def test_router_matches_paths_and_extracts_parameters(self):
"""Router matches paths and extracts parameters."""
url_map = Map(
[
Rule("/echo", defaults={"count": 1}, endpoint=echo),
Rule("/echo/<int:count>", endpoint=echo),
]
)
async with route(url_map, "localhost", 0) as server:
async with connect(get_uri(server) + "/echo") as client:
await client.send("hello")
messages = await alist(client)
self.assertEqual(messages, ["hello"])
async with connect(get_uri(server) + "/echo/3") as client:
await client.send("hello")
messages = await alist(client)
self.assertEqual(messages, ["hello", "hello", "hello"])
@property # avoids an import-time dependency on werkzeug
def url_map(self):
return Map(
[
Rule("/", endpoint=handler),
Rule("/r", redirect_to="/"),
]
)
async def test_route_with_query_string(self):
"""Router ignores query strings when matching paths."""
async with route(self.url_map, "localhost", 0) as server:
async with connect(get_uri(server) + "/?a=b") as client:
await self.assertEval(client, "ws.request.path", "/?a=b")
async def test_redirect(self):
"""Router redirects connections according to redirect_to."""
async with route(self.url_map, "localhost", 0) as server:
async with connect(get_uri(server) + "/r") as client:
await self.assertEval(client, "ws.request.path", "/")
async def test_secure_redirect(self):
"""Router redirects connections to a wss:// URI when TLS is enabled."""
async with route(self.url_map, "localhost", 0, ssl=SERVER_CONTEXT) as server:
async with connect(get_uri(server) + "/r", ssl=CLIENT_CONTEXT) as client:
await self.assertEval(client, "ws.request.path", "/")
@patch("websockets.asyncio.client.connect.process_redirect", lambda _, exc: exc)
async def test_force_secure_redirect(self):
"""Router redirects ws:// connections to a wss:// URI when ssl=True."""
async with route(self.url_map, "localhost", 0, ssl=True) as server:
redirect_uri = get_uri(server, secure=True)
with self.assertRaises(InvalidStatus) as raised:
async with connect(get_uri(server) + "/r"):
self.fail("did not raise")
self.assertEqual(
raised.exception.response.headers["Location"],
redirect_uri + "/",
)
@patch("websockets.asyncio.client.connect.process_redirect", lambda _, exc: exc)
async def test_force_redirect_server_name(self):
"""Router redirects connections to the host declared in server_name."""
async with route(self.url_map, "localhost", 0, server_name="other") as server:
with self.assertRaises(InvalidStatus) as raised:
async with connect(get_uri(server) + "/r"):
self.fail("did not raise")
self.assertEqual(
raised.exception.response.headers["Location"],
"ws://other/",
)
async def test_not_found(self):
"""Router rejects requests to unknown paths with an HTTP 404 error."""
async with route(self.url_map, "localhost", 0) as server:
with self.assertRaises(InvalidStatus) as raised:
async with connect(get_uri(server) + "/n"):
self.fail("did not raise")
self.assertEqual(
str(raised.exception),
"server rejected WebSocket connection: HTTP 404",
)
async def test_process_request_function_returning_none(self):
"""Router supports a process_request function returning None."""
def process_request(ws, request):
ws.process_request_ran = True
async with route(
self.url_map, "localhost", 0, process_request=process_request
) as server:
async with connect(get_uri(server) + "/") as client:
await self.assertEval(client, "ws.process_request_ran", "True")
async def test_process_request_coroutine_returning_none(self):
"""Router supports a process_request coroutine returning None."""
async def process_request(ws, request):
ws.process_request_ran = True
async with route(
self.url_map, "localhost", 0, process_request=process_request
) as server:
async with connect(get_uri(server) + "/") as client:
await self.assertEval(client, "ws.process_request_ran", "True")
async def test_process_request_function_returning_response(self):
"""Router supports a process_request function returning a response."""
def process_request(ws, request):
return ws.respond(http.HTTPStatus.FORBIDDEN, "Forbidden")
async with route(
self.url_map, "localhost", 0, process_request=process_request
) as server:
with self.assertRaises(InvalidStatus) as raised:
async with connect(get_uri(server) + "/"):
self.fail("did not raise")
self.assertEqual(
str(raised.exception),
"server rejected WebSocket connection: HTTP 403",
)
async def test_process_request_coroutine_returning_response(self):
"""Router supports a process_request coroutine returning a response."""
async def process_request(ws, request):
return ws.respond(http.HTTPStatus.FORBIDDEN, "Forbidden")
async with route(
self.url_map, "localhost", 0, process_request=process_request
) as server:
with self.assertRaises(InvalidStatus) as raised:
async with connect(get_uri(server) + "/"):
self.fail("did not raise")
self.assertEqual(
str(raised.exception),
"server rejected WebSocket connection: HTTP 403",
)
async def test_custom_router_factory(self):
"""Router supports a custom router factory."""
class MyRouter(Router):
async def handler(self, connection):
connection.my_router_ran = True
return await super().handler(connection)
async with route(
self.url_map, "localhost", 0, create_router=MyRouter
) as server:
async with connect(get_uri(server)) as client:
await self.assertEval(client, "ws.my_router_ran", "True")
@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "this test requires Unix sockets")
class UnixRouterTests(EvalShellMixin, unittest.IsolatedAsyncioTestCase):
async def test_router_supports_unix_sockets(self):
"""Router supports Unix sockets."""
url_map = Map([Rule("/echo/<int:count>", endpoint=echo)])
with temp_unix_socket_path() as path:
async with unix_route(url_map, path):
async with unix_connect(path, "ws://localhost/echo/3") as client:
await client.send("hello")
messages = await alist(client)
self.assertEqual(messages, ["hello", "hello", "hello"])
|