File: test_simple_pub_sub.py

package info (click to toggle)
graphql-core 3.2.6-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 5,384 kB
  • sloc: python: 45,812; makefile: 26; sh: 13
file content (96 lines) | stat: -rw-r--r-- 2,975 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from asyncio import sleep
from inspect import isawaitable

from pytest import mark, raises

from graphql.pyutils import SimplePubSub


def describe_simple_pub_sub():
    @mark.asyncio
    async def subscribe_async_iterator_mock():
        pubsub = SimplePubSub()
        iterator = pubsub.get_subscriber()

        # Queue up publishes
        assert pubsub.emit("Apple") is True
        assert pubsub.emit("Banana") is True

        # Read payloads
        assert await iterator.__anext__() == "Apple"
        assert await iterator.__anext__() == "Banana"

        # Read ahead
        i3 = await iterator.__anext__()
        assert isawaitable(i3)
        i4 = await iterator.__anext__()
        assert isawaitable(i4)

        # Publish
        assert pubsub.emit("Coconut") is True
        assert pubsub.emit("Durian") is True

        # Await out of order to get correct results
        assert await i4 == "Durian"
        assert await i3 == "Coconut"

        # Read ahead
        i5 = iterator.__anext__()

        # Terminate queue
        await iterator.aclose()

        # Publish is not caught after terminate
        assert pubsub.emit("Fig") is False

        # Find that cancelled read-ahead got a "done" result
        with raises(StopAsyncIteration):
            await i5

        # And next returns empty completion value
        with raises(StopAsyncIteration):
            await iterator.__anext__()

    @mark.asyncio
    async def iterator_aclose_empties_push_queue():
        pubsub = SimplePubSub()
        assert not pubsub.subscribers
        iterator = pubsub.get_subscriber()
        assert len(pubsub.subscribers) == 1
        assert iterator.listening
        for value in range(3):
            pubsub.emit(value)
        await sleep(0)
        assert iterator.push_queue.qsize() == 3
        assert iterator.pull_queue.qsize() == 0
        await iterator.aclose()
        assert not pubsub.subscribers
        assert iterator.push_queue.qsize() == 0
        assert iterator.pull_queue.qsize() == 0
        assert not iterator.listening

    @mark.asyncio
    async def iterator_aclose_empties_pull_queue():
        pubsub = SimplePubSub()
        assert not pubsub.subscribers
        iterator = pubsub.get_subscriber()
        assert len(pubsub.subscribers) == 1
        assert iterator.listening
        for _n in range(3):
            await iterator.__anext__()
        assert iterator.push_queue.qsize() == 0
        assert iterator.pull_queue.qsize() == 3
        await iterator.aclose()
        assert not pubsub.subscribers
        assert iterator.push_queue.qsize() == 0
        assert iterator.pull_queue.qsize() == 0
        assert not iterator.listening

    @mark.asyncio
    async def iterator_aclose_is_idempotent():
        pubsub = SimplePubSub()
        iterator = pubsub.get_subscriber()
        assert iterator.listening
        for n in range(3):
            await iterator.aclose()
            assert not iterator.listening