1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
import unittest
from concurrent.futures import ProcessPoolExecutor
import aioprocessing
from aioprocessing.mp import Process, Event, util
from ._base_test import BaseTest, _GenMixin
def queue_put(q, val):
val = q.put(val)
return val
def queue_get(q, e):
val = q.get()
e.set()
q.put(val)
class GenQueueMixin(_GenMixin):
def setUp(self):
super().setUp()
self.inst = self.Obj()
self.meth = "coro_get"
def _after(self):
self.inst.put(1)
class GenAioQueueTest(GenQueueMixin, BaseTest):
def setUp(self):
self.Obj = aioprocessing.AioQueue
super().setUp()
class GenAioSimpleQueueTest(GenQueueMixin, BaseTest):
def setUp(self):
self.Obj = aioprocessing.AioSimpleQueue
super().setUp()
class GenAioJoinableQueueTest(GenQueueMixin, BaseTest):
def setUp(self):
self.Obj = aioprocessing.AioJoinableQueue
super().setUp()
class QueueTest(BaseTest):
def test_blocking_put(self):
q = aioprocessing.AioQueue()
async def queue_put():
await q.coro_put(1)
self.loop.run_until_complete(queue_put())
self.assertEqual(q.get(), 1)
def test_put_get(self):
q = aioprocessing.AioQueue()
val = 1
p = Process(target=queue_put, args=(q, val))
async def queue_get():
ret = await q.coro_get()
self.assertEqual(ret, val)
p.start()
self.loop.run_until_complete(queue_get())
p.join()
def test_get_put(self):
q = aioprocessing.AioQueue()
e = Event()
val = 2
async def queue_put():
await q.coro_put(val)
p = Process(target=queue_get, args=(q, e))
p.start()
self.loop.run_until_complete(queue_put())
e.wait()
out = q.get()
p.join()
self.assertEqual(out, val)
def test_simple_queue(self):
q = aioprocessing.AioSimpleQueue()
val = 8
async def queue_put():
await q.coro_put(val)
self.loop.run_until_complete(queue_put())
out = q.get()
self.assertEqual(val, out)
class ManagerQueueTest(BaseTest):
@unittest.skipIf(
"multiprocess.util" in str(util),
"concurrent.futures is not yet supported by uqfoundation "
"(https://github.com/uqfoundation/pathos/issues/90)"
)
def test_executor(self):
m = aioprocessing.AioManager()
q = m.AioQueue()
p = ProcessPoolExecutor(max_workers=1)
val = 4
def submit():
yield p.submit(queue_put, q, val)
next(submit())
async def queue_get():
out = await q.coro_get()
self.assertEqual(out, val)
await q.coro_put(5)
self.loop.run_until_complete(queue_get())
returned = q.get()
self.assertEqual(returned, 5)
p.shutdown()
class JoinableQueueTest(BaseTest):
def test_join_empty_queue(self):
q = aioprocessing.AioJoinableQueue()
async def join():
await q.coro_join()
self.loop.run_until_complete(join())
if __name__ == "__main__":
unittest.main()
|