File: pool_test.py

package info (click to toggle)
aioprocessing 2.0.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 228 kB
  • sloc: python: 1,463; sh: 13; makefile: 9
file content (74 lines) | stat: -rw-r--r-- 1,836 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import aioprocessing

from ._base_test import BaseTest, _GenMixin


def work_func(a, b):
    c = a * b
    return c


def map_func(z):
    return z * 3


def starmap(func, it):
    return map(func, *zip(*it))


class GenAioPoolTest(BaseTest, _GenMixin):
    def setUp(self):
        super().setUp()
        self.Obj = aioprocessing.AioPool
        self.inst = self.Obj(1)
        self.initargs = (1,)
        self.meth = "coro_map"
        self.args = (map_func, [1, 2, 3])


class PoolTest(BaseTest):
    def setUp(self):
        super().setUp()
        self.pool = aioprocessing.AioPool()

    def tearDown(self):
        super().tearDown()
        self.pool.close()
        self.pool.join()

    def test_ctx_mgr(self):
        with aioprocessing.AioPool() as pool:
            self.assertIsInstance(pool, aioprocessing.pool.AioPool)
        self.assertRaises(ValueError, pool.map, map_func, [1])

    def test_coro_apply(self):
        async def do_apply():
            out = await self.pool.coro_apply(work_func, args=(2, 3))
            self.assertEqual(out, 6)

        self.loop.run_until_complete(do_apply())

    def test_coro_map(self):
        it = list(range(5))

        async def do_map():
            out = await self.pool.coro_map(map_func, it)
            self.assertEqual(out, list(map(map_func, it)))

        self.loop.run_until_complete(do_map())

    def test_coro_starmap(self):
        it = list(zip(range(5), range(5, 10)))

        async def do_starmap():
            out = await self.pool.coro_starmap(work_func, it)
            self.assertEqual(out, list(starmap(work_func, it)))

        self.loop.run_until_complete(do_starmap())

    def test_coro_join(self):
        async def do_join():
            await self.pool.coro_join()

        self.pool.close()
        self.loop.run_until_complete(do_join())