File: test_autoasync.py

package info (click to toggle)
python-autocommand 2.2.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 236 kB
  • sloc: python: 1,052; sh: 15; makefile: 3
file content (218 lines) | stat: -rw-r--r-- 5,688 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# Copyright 2014-2016 Nathan West
#
# This file is part of autocommand.
#
# autocommand is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# autocommand is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with autocommand.  If not, see <http://www.gnu.org/licenses/>.

import pytest
from contextlib import closing, contextmanager
asyncio = pytest.importorskip('asyncio')
autoasync = pytest.importorskip('autocommand.autoasync').autoasync


class YieldOnce:
    def __await__(self):
        yield


@contextmanager
def temporary_context_loop(loop):
    '''
    Set the given loop as the context loop (that is, the loop returned by
    asyncio.get_event_loop() for the duration of the context)
    '''
    old_loop = asyncio.get_event_loop()
    asyncio.set_event_loop(loop)
    try:
        yield loop
    finally:
        asyncio.set_event_loop(old_loop)


@pytest.fixture
def new_loop():
    '''
    Get a new event loop. The loop is closed afterwards
    '''
    with closing(asyncio.new_event_loop()) as loop:
        yield loop


@pytest.fixture
def context_loop():
    '''
    Create a new event loop and set it as the current context event loop.
    asyncio.get_event_loop() will return this loop within this fixture. Restore
    the original current event loop afterwards. The new loop is also closed
    afterwards.
    '''

    # Can't reuse new_loop() because some tests require new_loop and
    # context_loop to be different
    with closing(asyncio.new_event_loop()) as new_loop:
        with temporary_context_loop(new_loop):
            yield new_loop


def test_basic_autoasync(context_loop):
    data = set()

    async def coro_1():
        data.add(1)
        await YieldOnce()
        data.add(2)

        return 1

    async def coro_2():
        data.add(3)
        await YieldOnce()
        data.add(4)

        return 2

    @autoasync
    async def async_main():
        task1 = asyncio.create_task(coro_1())
        task2 = asyncio.create_task(coro_2())

        result1 = await task1
        result2 = await task2

        assert result1 == 1
        assert result2 == 2

        return 3

    assert async_main() == 3
    assert data == {1, 2, 3, 4}


def test_custom_loop(context_loop, new_loop):
    did_bad_coro_run = False

    async def bad_coro():
        nonlocal did_bad_coro_run
        did_bad_coro_run = True
        await YieldOnce()

    # TODO: this fires a "task wasn't awaited" warning; figure out how to
    # supress
    context_loop.create_task(bad_coro())

    @autoasync(loop=new_loop)
    async def async_main():
        await YieldOnce()
        await YieldOnce()
        return 3

    assert async_main() == 3
    assert did_bad_coro_run is False


def test_pass_loop(context_loop):
    @autoasync(pass_loop=True)
    async def async_main(loop):
        return loop

    assert async_main() is asyncio.get_event_loop()


def test_pass_loop_prior_argument(context_loop):
    '''
    Test that, if loop is the first positional argument, other arguments are
    still passed correctly
    '''
    @autoasync(pass_loop=True)
    async def async_main(loop, argument):
        return loop, argument

    loop, value = async_main(10)
    assert loop is asyncio.get_event_loop()
    assert value == 10


def test_pass_loop_kwarg_only(context_loop):
    @autoasync(pass_loop=True)
    async def async_main(*, loop, argument):
        await YieldOnce()
        return loop, argument

    loop, value = async_main(argument=10)
    assert loop is asyncio.get_event_loop()
    assert value == 10


def test_run_forever(context_loop):
    async def stop_loop_after(t):
        await asyncio.sleep(t)
        context_loop.stop()

    retrieved_value = False

    async def set_value_after(t):
        nonlocal retrieved_value
        await asyncio.sleep(t)
        retrieved_value = True

    @autoasync(forever=True)
    async def async_main():
        asyncio.create_task(set_value_after(0.1))
        asyncio.create_task(stop_loop_after(0.2))
        await YieldOnce()

    async_main()
    assert retrieved_value


def test_run_forever_func(context_loop):
    async def stop_loop_after(t):
        await asyncio.sleep(t)
        context_loop.stop()

    retrieved_value = False

    async def set_value_after(t):
        nonlocal retrieved_value
        await asyncio.sleep(t)
        retrieved_value = True

    @autoasync(forever=True)
    def main_func():
        asyncio.create_task(set_value_after(0.1))
        asyncio.create_task(stop_loop_after(0.2))

    main_func()
    assert retrieved_value


def test_defered_loop(context_loop, new_loop):
    '''
    Test that, if a new event loop is installed with set_event_loop AFTER the
    autoasync decorator is applied (and no loop= is explicitly given to
    autoasync), the new event loop is used when the decorated function is
    called.
    '''
    @autoasync(pass_loop=True)
    async def async_main(loop):
        await YieldOnce()
        return loop

    with temporary_context_loop(new_loop):
        passed_loop = async_main()
        assert passed_loop is new_loop
        assert passed_loop is asyncio.get_event_loop()
        assert passed_loop is not context_loop

    assert passed_loop is not asyncio.get_event_loop()