File: test_async_base_view.py

package info (click to toggle)
strawberry-graphql 0.306.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 18,176 kB
  • sloc: javascript: 178,052; python: 65,643; sh: 33; makefile: 25
file content (80 lines) | stat: -rw-r--r-- 2,960 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import asyncio
from asyncio import sleep
from collections import Counter
from collections.abc import AsyncGenerator
from random import random
from typing import Any, cast

import pytest

from strawberry.http.async_base_view import AsyncBaseHTTPView


@pytest.mark.parametrize(
    "expected",
    [
        pytest.param(["last"], id="single_item"),
        pytest.param(["1st", "last"], id="two_items"),
        pytest.param(["1st", "2nd", "last"], id="three_items"),
    ],
)
async def test_stream_with_heartbeat_should_yield_items_correctly(
    expected: list[str],
) -> None:
    """
    Verifies _stream_with_heartbeat reliably delivers all items in correct order.

    Tests three critical stream properties:
    1. Completeness: All source items appear in output (especially the last item)
    2. Uniqueness: Each expected item appears exactly once
    3. Order: Original sequence of items is preserved

    Uses multiple test cases via parametrization and runs 100 concurrent streams
    with randomized delays to stress-test the implementation. This specifically
    targets race conditions between the drain task and queue consumer that could
    cause missing items, duplicates, or reordering.
    """

    assert len(set(expected)) == len(expected), "Test requires unique elements"

    class MockAsyncBaseHTTPView:
        def encode_multipart_data(self, *_: Any, **__: Any) -> str:
            return ""

    view = MockAsyncBaseHTTPView()

    async def stream() -> AsyncGenerator[str, None]:
        for elem in expected:
            yield elem

    async def collect() -> list[str]:
        result = []
        async for item in AsyncBaseHTTPView._stream_with_heartbeat(
            cast("AsyncBaseHTTPView", view), stream, ""
        )():
            result.append(item)
            # Random sleep to promote race conditions between concurrent tasks
            await sleep(random() / 1000)  # noqa: S311
        return result

    for actual in await asyncio.gather(*(collect() for _ in range(100))):
        # Validation 1: Item completeness
        count = Counter(actual)
        if missing_items := set(expected) - set(count):
            assert not missing_items, f"Missing expected items: {list(missing_items)}"

        # Validation 2: No duplicates
        for item in expected:
            item_count = count[item]
            assert item_count == 1, (
                f"Expected item '{item}' appears {item_count} times (should appear exactly once)"
            )

        # Validation 3: Preserved ordering
        item_indices = {item: actual.index(item) for item in expected}
        for i in range(len(expected) - 1):
            curr, next_item = expected[i], expected[i + 1]
            assert item_indices[curr] < item_indices[next_item], (
                f"Order incorrect: '{curr}' (at index {item_indices[curr]}) "
                f"should appear before '{next_item}' (at index {item_indices[next_item]})"
            )