File: test_multiprocessing.py

package info (click to toggle)
python-maggma 0.70.0-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,416 kB
  • sloc: python: 10,150; makefile: 12
file content (78 lines) | stat: -rw-r--r-- 1,886 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import time
from concurrent.futures import ThreadPoolExecutor

import pytest

from maggma.cli.multiprocessing import AsyncUnorderedMap, BackPressure, grouper, safe_dispatch


@pytest.mark.asyncio()
async def test_grouper():
    async def arange(count):
        for i in range(count):
            yield (i)

    async for group in grouper(arange(100), n=10):
        assert len(group) == 10

    async for group in grouper(arange(9), n=10):
        assert len(group) == 9


def wait_and_return(x):
    time.sleep(1)
    return x * x


async def arange(n):
    for num in range(n):
        yield num


@pytest.mark.asyncio()
async def test_backpressure():
    iterable = range(10)
    backpressure = BackPressure(iterable, 2)

    # Put two items into the process queue
    await backpressure.__anext__()
    await backpressure.__anext__()

    # Ensure back_pressure enabled
    assert backpressure.back_pressure.locked()

    # Release back pressure
    releaser = backpressure.release(arange(10))
    await releaser.__anext__()
    assert not backpressure.back_pressure.locked()

    # Ensure can keep releasing backing pressure and won't error
    await releaser.__anext__()
    await releaser.__anext__()

    # Ensure stop iteration works
    with pytest.raises(StopAsyncIteration):  # noqa: PT012
        for _i in range(10):
            await releaser.__anext__()

    assert not backpressure.back_pressure.locked()


@pytest.mark.asyncio()
async def test_async_map():
    executor = ThreadPoolExecutor(1)
    amap = AsyncUnorderedMap(wait_and_return, arange(3), executor)
    true_values = {x * x for x in range(3)}

    finished_vals = set()
    async for finished_val in amap:
        finished_vals.add(finished_val)

    assert finished_vals == true_values


def test_safe_dispatch():
    def bad_func(val):
        raise ValueError("AAAH")

    safe_dispatch((bad_func, ""))