File: test_async_util.py

package info (click to toggle)
cylc-flow 8.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 14,368 kB
  • sloc: python: 87,751; sh: 17,109; sql: 233; xml: 171; javascript: 78; lisp: 55; makefile: 11
file content (266 lines) | stat: -rw-r--r-- 6,742 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import asyncio
from inspect import signature
import logging
from pathlib import Path
from random import random

import pytest

from cylc.flow.async_util import (
    pipe,
    asyncqgen,
    scandir,
)

LOG = logging.getLogger('test')


@pipe()
async def a_range(n):
    for num in range(n):
        LOG.info(f'a_range({n})')
        yield num


@pipe
async def even(x):
    LOG.info(f'even({x})')
    return x % 2 == 0


@pipe
async def mult(x, y, kwarg='useless kwarg'):
    LOG.info(f'mult{x, y}')
    return x * y


@pipe
async def sleepy(x):
    """A filter which waits a while then passes."""
    LOG.info(f'sleepy({x})')
    await asyncio.sleep(0.1)
    return True


@pytest.mark.parametrize('preserve_order', (True, False))
async def test_pipe(preserve_order):
    """It passes values through the pipe."""
    pipe = a_range(5) | even | mult(2)
    pipe.preserve_order = preserve_order

    result = []
    async for num in pipe:
        result.append(num)

    assert result == [
        0,
        4,
        8,
    ]


@pytest.mark.parametrize('preserve_order', (True, False))
async def test_pipe_single(preserve_order):
    """It allow single-step pipes."""
    pipe = a_range(5)
    pipe.preserve_order = preserve_order

    result = []
    async for num in pipe:
        result.append(num)

    assert result == [
        0,
        1,
        2,
        3,
        4
    ]


@pytest.mark.parametrize('preserve_order', (True, False))
async def test_pipe_reusable(preserve_order):
    """It can be re-used once depleted."""
    pipe = a_range(5) | even | mult(2)
    pipe.preserve_order = preserve_order

    for _ in range(5):
        result = []
        async for num in pipe:
            result.append(num)

        assert result == [
            0,
            4,
            8,
        ]


@pytest.mark.parametrize('preserve_order', (True, False))
async def test_pipe_filter_stop(preserve_order):
    """It yields values early with the filter_stop argument."""
    pipe = a_range(5) | even(filter_stop=False)
    pipe |= mult(10)
    pipe.preserve_order = preserve_order

    result = []
    async for num in pipe:
        result.append(num)

    # the even numbers should be multiplied by 10
    # the odd numbers should be yielded early (so don't get multiplied)
    assert result == [
        0,
        1,
        20,
        3,
        40,
    ]


@pipe
async def one(x):
    await asyncio.sleep(random() / 5)
    return x


@pytest.mark.parametrize('preserve_order', (True, False))
async def test_pipe_preserve_order(preserve_order):
    """It should control result order according to pipe configuration."""
    n = 50
    pipe = a_range(n) | one | one | one
    pipe.preserve_order = preserve_order

    result = []
    async for item in pipe:
        result.append(item)

    # the odds of getting 50 items in order by chance are pretty slim
    assert (result == list(range(n))) is preserve_order


@pytest.mark.parametrize('preserve_order', (True, False))
async def test_pipe_concurrent(caplog, preserve_order):
    """It runs pipes concurrently.

    It is easy to make something which appears to be concurrent, this
    test is intended to ensure that it actually IS concurrent.

    """
    pipe = a_range(5) | even | sleepy | mult(2)
    pipe.preserve_order = preserve_order

    caplog.set_level(logging.INFO, 'test')
    async for num in pipe:
        pass

    order = [
        # a list of the log messages generated by each step of the pipe
        # as it processes an item
        x[2].split('(')[0]
        for x in caplog.record_tuples
    ]

    assert 'mult' in order
    assert len(order) == 4 * 4  # 4 steps * 4 items yielded by a_range

    # ensure that the steps aren't completed in order (as sync code would)
    # the sleep should ensure this
    # NOTE: not the best test but better than nothing
    assert order != [
        'a_range',
        'even',
        'sleepy',
        'mult'
    ] * 4


def test_pipe_str():
    """It has helpful textual representations."""
    pipe = a_range(5) | even(filter_stop=False) | mult(10, kwarg=42)
    assert str(pipe) == 'a_range(5)'
    assert repr(pipe) == 'a_range(5) | even() | mult(10, kwarg=42)'


@pipe()  # NOTE: these brackets are what the next function is testing
async def div(x, y):
    return x / y


def test_pipe_brackets():
    """Ensure that pipe functions can be declared with or without brackets."""
    pipe = a_range(5) | div
    assert repr(pipe) == 'a_range(5) | div()'


@pipe
async def documented(x: str, y: int = 0):
    """The docstring for the pipe function."""
    pass


def test_documentation():
    """It should preserve the docstring, signature & annotations of
    the wrapped function."""
    assert documented.__doc__ == 'The docstring for the pipe function.'
    assert documented.__annotations__ == {'x': str, 'y': int}
    assert str(signature(documented)) == '(x: str, y: int = 0)'


def test_rewind():
    """It should be possible to move throught the pipe stages."""
    pipe = a_range | mult | even
    assert pipe.fastforward().rewind() == pipe


async def test_asyncqgen():
    """It should provide an async gen interface to an async queue."""
    queue = asyncio.Queue()

    gen = asyncqgen(queue)

    await queue.put(1)
    await queue.put(2)
    await queue.put(3)

    ret = []
    async for item in gen:
        ret.append(item)

    assert ret == [1, 2, 3]


async def test_scandir(tmp_path: Path):
    """It should list directory contents (including symlinks)."""
    (tmp_path / 'a').touch()
    (tmp_path / 'b').touch()
    (tmp_path / 'c').symlink_to(tmp_path / 'b')

    assert sorted(await scandir(tmp_path)) == [
        Path(tmp_path, 'a'),
        Path(tmp_path, 'b'),
        Path(tmp_path, 'c')
    ]


async def test_scandir_non_exist(tmp_path: Path):
    """scandir() should raise FileNotFoundError if called on a path that
    doesn't exist."""
    with pytest.raises(FileNotFoundError):
        await scandir(tmp_path / 'HORSE')