File: test_benchmarks.py

package info (click to toggle)
python-janus 2.0.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 220 kB
  • sloc: python: 1,805; makefile: 25
file content (152 lines) | stat: -rw-r--r-- 3,234 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import asyncio
import sys
import janus

if sys.version_info >= (3, 11):
    from asyncio import Runner
else:
    from backports.asyncio.runner import Runner


def test_bench_sync_put_async_get(benchmark):
    q: janus.Queue

    async def init():
        nonlocal q
        q = janus.Queue()

    def threaded():
        for i in range(5):
            q.sync_q.put(i)

    async def go():
        for i in range(100):
            f = asyncio.get_running_loop().run_in_executor(None, threaded)
            for i in range(5):
                val = await q.async_q.get()
                assert val == i

            await f
            assert q.async_q.empty()

    async def finish():
        q.close()
        await q.wait_closed()

    with Runner(debug=True) as runner:
        runner.run(init())

        @benchmark
        def _run():
            runner.run(go())

        runner.run(finish())


def test_bench_sync_put_async_join(benchmark):
    q: janus.Queue

    async def init():
        nonlocal q
        q = janus.Queue()

    async def go():
        for i in range(100):
            for i in range(5):
                q.sync_q.put(i)

            async def do_work():
                await asyncio.sleep(0.01)
                while not q.async_q.empty():
                    await q.async_q.get()
                    q.async_q.task_done()

            task = asyncio.create_task(do_work())

            await q.async_q.join()
            await task

    async def finish():
        q.close()
        await q.wait_closed()

    with Runner(debug=True) as runner:
        runner.run(init())

        @benchmark
        def _run():
            runner.run(go())

        runner.run(finish())


def test_bench_async_put_sync_get(benchmark):
    q: janus.Queue

    async def init():
        nonlocal q
        q = janus.Queue()

    def threaded():
        for i in range(5):
            val = q.sync_q.get()
            assert val == i

    async def go():
        for i in range(100):
            f = asyncio.get_running_loop().run_in_executor(None, threaded)
            for i in range(5):
                await q.async_q.put(i)

            await f
            assert q.async_q.empty()

    async def finish():
        q.close()
        await q.wait_closed()

    with Runner(debug=True) as runner:
        runner.run(init())

        @benchmark
        def _run():
            runner.run(go())

        runner.run(finish())


def test_sync_join_async_done(benchmark):
    q: janus.Queue

    async def init():
        nonlocal q
        q = janus.Queue()

    def threaded():
        for i in range(5):
            q.sync_q.put(i)
        q.sync_q.join()

    async def go():
        for i in range(100):
            f = asyncio.get_running_loop().run_in_executor(None, threaded)
            for i in range(5):
                val = await q.async_q.get()
                assert val == i
                q.async_q.task_done()

            await f
            assert q.async_q.empty()

    async def finish():
        q.close()
        await q.wait_closed()

    with Runner(debug=True) as runner:
        runner.run(init())

        @benchmark
        def _run():
            runner.run(go())

        runner.run(finish())