File: test_requests.py

package info (click to toggle)
starlette 0.50.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 4,072 kB
  • sloc: python: 13,532; sh: 35; makefile: 6
file content (649 lines) | stat: -rw-r--r-- 22,744 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
from __future__ import annotations

import sys
from collections.abc import Iterator
from typing import Any

import anyio
import pytest

from starlette.datastructures import URL, Address, State
from starlette.requests import ClientDisconnect, Request
from starlette.responses import JSONResponse, PlainTextResponse, Response
from starlette.types import Message, Receive, Scope, Send
from tests.types import TestClientFactory


def test_request_url(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        data = {"method": request.method, "url": str(request.url)}
        response = JSONResponse(data)
        await response(scope, receive, send)

    client = test_client_factory(app)
    response = client.get("/123?a=abc")
    assert response.json() == {"method": "GET", "url": "http://testserver/123?a=abc"}

    response = client.get("https://example.org:123/")
    assert response.json() == {"method": "GET", "url": "https://example.org:123/"}


def test_request_query_params(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        params = dict(request.query_params)
        response = JSONResponse({"params": params})
        await response(scope, receive, send)

    client = test_client_factory(app)
    response = client.get("/?a=123&b=456")
    assert response.json() == {"params": {"a": "123", "b": "456"}}


@pytest.mark.skipif(
    any(module in sys.modules for module in ("brotli", "brotlicffi")),
    reason='urllib3 includes "br" to the "accept-encoding" headers.',
)
def test_request_headers(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        headers = dict(request.headers)
        response = JSONResponse({"headers": headers})
        await response(scope, receive, send)

    client = test_client_factory(app)
    response = client.get("/", headers={"host": "example.org"})
    assert response.json() == {
        "headers": {
            "host": "example.org",
            "user-agent": "testclient",
            "accept-encoding": "gzip, deflate",
            "accept": "*/*",
            "connection": "keep-alive",
        }
    }


@pytest.mark.parametrize(
    "scope,expected_client",
    [
        ({"client": ["client", 42]}, Address("client", 42)),
        ({"client": None}, None),
        ({}, None),
    ],
)
def test_request_client(scope: Scope, expected_client: Address | None) -> None:
    scope.update({"type": "http"})  # required by Request's constructor
    client = Request(scope).client
    assert client == expected_client


def test_request_body(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        body = await request.body()
        response = JSONResponse({"body": body.decode()})
        await response(scope, receive, send)

    client = test_client_factory(app)

    response = client.get("/")
    assert response.json() == {"body": ""}

    response = client.post("/", json={"a": "123"})
    assert response.json() in [{"body": '{"a":"123"}'}, {'body': '{"a": "123"}'}]

    response = client.post("/", data="abc")  # type: ignore
    assert response.json() == {"body": "abc"}


def test_request_stream(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        body = b""
        async for chunk in request.stream():
            body += chunk
        response = JSONResponse({"body": body.decode()})
        await response(scope, receive, send)

    client = test_client_factory(app)

    response = client.get("/")
    assert response.json() == {"body": ""}

    response = client.post("/", json={"a": "123"})
    assert response.json() in [{"body": '{"a":"123"}'}, {'body': '{"a": "123"}'}]

    response = client.post("/", data="abc")  # type: ignore
    assert response.json() == {"body": "abc"}


def test_request_form_urlencoded(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        form = await request.form()
        response = JSONResponse({"form": dict(form)})
        await response(scope, receive, send)

    client = test_client_factory(app)

    response = client.post("/", data={"abc": "123 @"})
    assert response.json() == {"form": {"abc": "123 @"}}


def test_request_form_context_manager(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        async with request.form() as form:
            response = JSONResponse({"form": dict(form)})
            await response(scope, receive, send)

    client = test_client_factory(app)

    response = client.post("/", data={"abc": "123 @"})
    assert response.json() == {"form": {"abc": "123 @"}}


def test_request_body_then_stream(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        body = await request.body()
        chunks = b""
        async for chunk in request.stream():
            chunks += chunk
        response = JSONResponse({"body": body.decode(), "stream": chunks.decode()})
        await response(scope, receive, send)

    client = test_client_factory(app)

    response = client.post("/", data="abc")  # type: ignore
    assert response.json() == {"body": "abc", "stream": "abc"}


def test_request_stream_then_body(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        chunks = b""
        async for chunk in request.stream():
            chunks += chunk
        try:
            body = await request.body()
        except RuntimeError:
            body = b"<stream consumed>"
        response = JSONResponse({"body": body.decode(), "stream": chunks.decode()})
        await response(scope, receive, send)

    client = test_client_factory(app)

    response = client.post("/", data="abc")  # type: ignore
    assert response.json() == {"body": "<stream consumed>", "stream": "abc"}


def test_request_json(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        data = await request.json()
        response = JSONResponse({"json": data})
        await response(scope, receive, send)

    client = test_client_factory(app)
    response = client.post("/", json={"a": "123"})
    assert response.json() == {"json": {"a": "123"}}


def test_request_scope_interface() -> None:
    """
    A Request can be instantiated with a scope, and presents a `Mapping`
    interface.
    """
    request = Request({"type": "http", "method": "GET", "path": "/abc/"})
    assert request["method"] == "GET"
    assert dict(request) == {"type": "http", "method": "GET", "path": "/abc/"}
    assert len(request) == 3


def test_request_raw_path(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        path = request.scope["path"]
        raw_path = request.scope["raw_path"]
        response = PlainTextResponse(f"{path}, {raw_path}")
        await response(scope, receive, send)

    client = test_client_factory(app)
    response = client.get("/he%2Fllo")
    assert response.text == "/he/llo, b'/he%2Fllo'"


def test_request_without_setting_receive(
    test_client_factory: TestClientFactory,
) -> None:
    """
    If Request is instantiated without the receive channel, then .body()
    is not available.
    """

    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope)
        try:
            data = await request.json()
        except RuntimeError:
            data = "Receive channel not available"
        response = JSONResponse({"json": data})
        await response(scope, receive, send)

    client = test_client_factory(app)
    response = client.post("/", json={"a": "123"})
    assert response.json() == {"json": "Receive channel not available"}


def test_request_disconnect(
    anyio_backend_name: str,
    anyio_backend_options: dict[str, Any],
) -> None:
    """
    If a client disconnect occurs while reading request body
    then ClientDisconnect should be raised.
    """

    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        await request.body()

    async def receiver() -> Message:
        return {"type": "http.disconnect"}

    scope = {"type": "http", "method": "POST", "path": "/"}
    with pytest.raises(ClientDisconnect):
        anyio.run(
            app,  # type: ignore
            scope,
            receiver,
            None,
            backend=anyio_backend_name,
            backend_options=anyio_backend_options,
        )


def test_request_is_disconnected(test_client_factory: TestClientFactory) -> None:
    """
    If a client disconnect occurs after reading request body
    then request will be set disconnected properly.
    """
    disconnected_after_response = None

    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        nonlocal disconnected_after_response

        request = Request(scope, receive)
        body = await request.body()
        disconnected = await request.is_disconnected()
        response = JSONResponse({"body": body.decode(), "disconnected": disconnected})
        await response(scope, receive, send)
        disconnected_after_response = await request.is_disconnected()

    client = test_client_factory(app)
    response = client.post("/", content="foo")
    assert response.json() == {"body": "foo", "disconnected": False}
    assert disconnected_after_response


def test_request_state_object() -> None:
    scope = {"state": {"old": "foo"}}

    s = State(scope["state"])

    s.new = "value"
    assert s.new == "value"

    del s.new

    with pytest.raises(AttributeError):
        s.new


def test_request_state(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        request.state.example = 123
        response = JSONResponse({"state.example": request.state.example})
        await response(scope, receive, send)

    client = test_client_factory(app)
    response = client.get("/123?a=abc")
    assert response.json() == {"state.example": 123}


def test_request_cookies(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        mycookie = request.cookies.get("mycookie")
        if mycookie:
            response = Response(mycookie, media_type="text/plain")
        else:
            response = Response("Hello, world!", media_type="text/plain")
            response.set_cookie("mycookie", "Hello, cookies!")

        await response(scope, receive, send)

    client = test_client_factory(app)
    response = client.get("/")
    assert response.text == "Hello, world!"
    response = client.get("/")
    assert response.text == "Hello, cookies!"


def test_cookie_lenient_parsing(test_client_factory: TestClientFactory) -> None:
    """
    The following test is based on a cookie set by Okta, a well-known authorization
    service. It turns out that it's common practice to set cookies that would be
    invalid according to the spec.
    """
    tough_cookie = (
        "provider-oauth-nonce=validAsciiblabla; "
        'okta-oauth-redirect-params={"responseType":"code","state":"somestate",'
        '"nonce":"somenonce","scopes":["openid","profile","email","phone"],'
        '"urls":{"issuer":"https://subdomain.okta.com/oauth2/authServer",'
        '"authorizeUrl":"https://subdomain.okta.com/oauth2/authServer/v1/authorize",'
        '"userinfoUrl":"https://subdomain.okta.com/oauth2/authServer/v1/userinfo"}}; '
        "importantCookie=importantValue; sessionCookie=importantSessionValue"
    )
    expected_keys = {
        "importantCookie",
        "okta-oauth-redirect-params",
        "provider-oauth-nonce",
        "sessionCookie",
    }

    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        response = JSONResponse({"cookies": request.cookies})
        await response(scope, receive, send)

    client = test_client_factory(app)
    response = client.get("/", headers={"cookie": tough_cookie})
    result = response.json()
    assert len(result["cookies"]) == 4
    assert set(result["cookies"].keys()) == expected_keys


# These test cases copied from Tornado's implementation
@pytest.mark.parametrize(
    "set_cookie,expected",
    [
        ("chips=ahoy; vienna=finger", {"chips": "ahoy", "vienna": "finger"}),
        # all semicolons are delimiters, even within quotes
        (
            'keebler="E=mc2; L=\\"Loves\\"; fudge=\\012;"',
            {"keebler": '"E=mc2', "L": '\\"Loves\\"', "fudge": "\\012", "": '"'},
        ),
        # Illegal cookies that have an '=' char in an unquoted value.
        ("keebler=E=mc2", {"keebler": "E=mc2"}),
        # Cookies with ':' character in their name.
        ("key:term=value:term", {"key:term": "value:term"}),
        # Cookies with '[' and ']'.
        ("a=b; c=[; d=r; f=h", {"a": "b", "c": "[", "d": "r", "f": "h"}),
        # Cookies that RFC6265 allows.
        ("a=b; Domain=example.com", {"a": "b", "Domain": "example.com"}),
        # parse_cookie() keeps only the last cookie with the same name.
        ("a=b; h=i; a=c", {"a": "c", "h": "i"}),
    ],
)
def test_cookies_edge_cases(
    set_cookie: str,
    expected: dict[str, str],
    test_client_factory: TestClientFactory,
) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        response = JSONResponse({"cookies": request.cookies})
        await response(scope, receive, send)

    client = test_client_factory(app)
    response = client.get("/", headers={"cookie": set_cookie})
    result = response.json()
    assert result["cookies"] == expected


@pytest.mark.parametrize(
    "set_cookie,expected",
    [
        # Chunks without an equals sign appear as unnamed values per
        # https://bugzilla.mozilla.org/show_bug.cgi?id=169091
        (
            "abc=def; unnamed; django_language=en",
            {"": "unnamed", "abc": "def", "django_language": "en"},
        ),
        # Even a double quote may be an unamed value.
        ('a=b; "; c=d', {"a": "b", "": '"', "c": "d"}),
        # Spaces in names and values, and an equals sign in values.
        ("a b c=d e = f; gh=i", {"a b c": "d e = f", "gh": "i"}),
        # More characters the spec forbids.
        ('a   b,c<>@:/[]?{}=d  "  =e,f g', {"a   b,c<>@:/[]?{}": 'd  "  =e,f g'}),
        # Unicode characters. The spec only allows ASCII.
        # ("saint=André Bessette", {"saint": "André Bessette"}),
        # Browsers don't send extra whitespace or semicolons in Cookie headers,
        # but cookie_parser() should parse whitespace the same way
        # document.cookie parses whitespace.
        ("  =  b  ;  ;  =  ;   c  =  ;  ", {"": "b", "c": ""}),
    ],
)
def test_cookies_invalid(
    set_cookie: str,
    expected: dict[str, str],
    test_client_factory: TestClientFactory,
) -> None:
    """
    Cookie strings that are against the RFC6265 spec but which browsers will send if set
    via document.cookie.
    """

    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        response = JSONResponse({"cookies": request.cookies})
        await response(scope, receive, send)

    client = test_client_factory(app)
    response = client.get("/", headers={"cookie": set_cookie})
    result = response.json()
    assert result["cookies"] == expected


def test_multiple_cookie_headers(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        scope["headers"] = [(b"cookie", b"a=abc"), (b"cookie", b"b=def"), (b"cookie", b"c=ghi")]
        request = Request(scope, receive)
        response = JSONResponse({"cookies": request.cookies})
        await response(scope, receive, send)

    client = test_client_factory(app)
    response = client.get("/")
    result = response.json()
    assert result["cookies"] == {"a": "abc", "b": "def", "c": "ghi"}


def test_chunked_encoding(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        body = await request.body()
        response = JSONResponse({"body": body.decode()})
        await response(scope, receive, send)

    client = test_client_factory(app)

    def post_body() -> Iterator[bytes]:
        yield b"foo"
        yield b"bar"

    response = client.post("/", data=post_body())  # type: ignore
    assert response.json() == {"body": "foobar"}


def test_request_send_push_promise(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        # the server is push-enabled
        scope["extensions"]["http.response.push"] = {}

        request = Request(scope, receive, send)
        await request.send_push_promise("/style.css")

        response = JSONResponse({"json": "OK"})
        await response(scope, receive, send)

    client = test_client_factory(app)
    response = client.get("/")
    assert response.json() == {"json": "OK"}


def test_request_send_push_promise_without_push_extension(
    test_client_factory: TestClientFactory,
) -> None:
    """
    If server does not support the `http.response.push` extension,
    .send_push_promise() does nothing.
    """

    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope)
        await request.send_push_promise("/style.css")

        response = JSONResponse({"json": "OK"})
        await response(scope, receive, send)

    client = test_client_factory(app)
    response = client.get("/")
    assert response.json() == {"json": "OK"}


def test_request_send_push_promise_without_setting_send(
    test_client_factory: TestClientFactory,
) -> None:
    """
    If Request is instantiated without the send channel, then
    .send_push_promise() is not available.
    """

    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        # the server is push-enabled
        scope["extensions"]["http.response.push"] = {}

        data = "OK"
        request = Request(scope)
        try:
            await request.send_push_promise("/style.css")
        except RuntimeError:
            data = "Send channel not available"
        response = JSONResponse({"json": data})
        await response(scope, receive, send)

    client = test_client_factory(app)
    response = client.get("/")
    assert response.json() == {"json": "Send channel not available"}


@pytest.mark.parametrize(
    "messages",
    [
        [{"body": b"123", "more_body": True}, {"body": b""}],
        [{"body": b"", "more_body": True}, {"body": b"123"}],
        [{"body": b"12", "more_body": True}, {"body": b"3"}],
        [
            {"body": b"123", "more_body": True},
            {"body": b"", "more_body": True},
            {"body": b""},
        ],
    ],
)
@pytest.mark.anyio
async def test_request_rcv(messages: list[Message]) -> None:
    messages = messages.copy()

    async def rcv() -> Message:
        return {"type": "http.request", **messages.pop(0)}

    request = Request({"type": "http"}, rcv)

    body = await request.body()

    assert body == b"123"


@pytest.mark.anyio
async def test_request_stream_called_twice() -> None:
    messages: list[Message] = [
        {"type": "http.request", "body": b"1", "more_body": True},
        {"type": "http.request", "body": b"2", "more_body": True},
        {"type": "http.request", "body": b"3"},
    ]

    async def rcv() -> Message:
        return messages.pop(0)

    request = Request({"type": "http"}, rcv)

    s1 = request.stream()
    s2 = request.stream()

    msg = await s1.__anext__()
    assert msg == b"1"

    msg = await s2.__anext__()
    assert msg == b"2"

    msg = await s1.__anext__()
    assert msg == b"3"

    # at this point we've consumed the entire body
    # so we should not wait for more body (which would hang us forever)
    msg = await s1.__anext__()
    assert msg == b""
    msg = await s2.__anext__()
    assert msg == b""

    # and now both streams are exhausted
    with pytest.raises(StopAsyncIteration):
        assert await s2.__anext__()
    with pytest.raises(StopAsyncIteration):
        await s1.__anext__()


def test_request_url_outside_starlette_context(test_client_factory: TestClientFactory) -> None:
    async def app(scope: Scope, receive: Receive, send: Send) -> None:
        request = Request(scope, receive)
        request.url_for("index")

    client = test_client_factory(app)
    with pytest.raises(
        RuntimeError,
        match="The `url_for` method can only be used inside a Starlette application or with a router.",
    ):
        client.get("/")


def test_request_url_starlette_context(test_client_factory: TestClientFactory) -> None:
    from starlette.applications import Starlette
    from starlette.middleware import Middleware
    from starlette.routing import Route
    from starlette.types import ASGIApp

    url_for = None

    async def homepage(request: Request) -> Response:
        return PlainTextResponse("Hello, world!")

    class CustomMiddleware:
        def __init__(self, app: ASGIApp) -> None:
            self.app = app

        async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
            nonlocal url_for
            request = Request(scope, receive)
            url_for = request.url_for("homepage")
            await self.app(scope, receive, send)

    app = Starlette(routes=[Route("/home", homepage)], middleware=[Middleware(CustomMiddleware)])

    client = test_client_factory(app)
    client.get("/home")
    assert url_for == URL("http://testserver/home")