1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
|
import sys
import threading
import unittest
import mitogen.core
import testlib
def yield_stuff_then_die(sender):
for x in range(5):
sender.send(x)
sender.close()
return 10
class ConstructorTest(testlib.RouterMixin, testlib.TestCase):
def test_handle(self):
recv = mitogen.core.Receiver(self.router)
self.assertIsInstance(recv.handle, int)
self.assertGreater(recv.handle, 100)
self.router.route(
mitogen.core.Message.pickled(
'hi',
dst_id=0,
handle=recv.handle,
)
)
self.assertEqual('hi', recv.get().unpickle())
class IterationTest(testlib.RouterMixin, testlib.TestCase):
def test_dead_stops_iteration(self):
recv = mitogen.core.Receiver(self.router)
fork = self.router.local()
ret = fork.call_async(yield_stuff_then_die, recv.to_sender())
self.assertEqual(list(range(5)), list(m.unpickle() for m in recv))
self.assertEqual(10, ret.get().unpickle())
def iter_and_put(self, recv, latch):
try:
for msg in recv:
latch.put(msg)
except Exception:
latch.put(sys.exc_info()[1])
def test_close_stops_iteration(self):
recv = mitogen.core.Receiver(self.router)
latch = mitogen.core.Latch()
t = threading.Thread(
target=self.iter_and_put,
args=(recv, latch),
)
t.start()
t.join(0.1)
recv.close()
t.join()
self.assertTrue(latch.empty())
class CloseTest(testlib.RouterMixin, testlib.TestCase):
def wait(self, latch, wait_recv):
try:
latch.put(wait_recv.get())
except Exception:
latch.put(sys.exc_info()[1])
def test_closes_one(self):
latch = mitogen.core.Latch()
wait_recv = mitogen.core.Receiver(self.router)
t = threading.Thread(target=lambda: self.wait(latch, wait_recv))
t.start()
wait_recv.close()
def throw():
raise latch.get()
t.join()
e = self.assertRaises(mitogen.core.ChannelError, throw)
self.assertEqual(e.args[0], mitogen.core.Receiver.closed_msg)
def test_closes_all(self):
latch = mitogen.core.Latch()
wait_recv = mitogen.core.Receiver(self.router)
ts = [
threading.Thread(target=lambda: self.wait(latch, wait_recv))
for x in range(5)
]
for t in ts:
t.start()
wait_recv.close()
def throw():
raise latch.get()
for x in range(5):
e = self.assertRaises(mitogen.core.ChannelError, throw)
self.assertEqual(e.args[0], mitogen.core.Receiver.closed_msg)
for t in ts:
t.join()
class OnReceiveTest(testlib.RouterMixin, testlib.TestCase):
# Verify behaviour of _on_receive dead message handling. A dead message
# should unregister the receiver and wake all threads.
def wait(self, latch, wait_recv):
try:
latch.put(wait_recv.get())
except Exception:
latch.put(sys.exc_info()[1])
def test_sender_closes_one_thread(self):
latch = mitogen.core.Latch()
wait_recv = mitogen.core.Receiver(self.router)
t = threading.Thread(target=lambda: self.wait(latch, wait_recv))
t.start()
sender = wait_recv.to_sender()
sender.close()
def throw():
raise latch.get()
t.join()
e = self.assertRaises(mitogen.core.ChannelError, throw)
self.assertEqual(e.args[0], sender.explicit_close_msg)
@unittest.skip(reason=(
'Unclear if a asingle dead message received from remote should '
'cause all threads to wake up.'
))
def test_sender_closes_all_threads(self):
latch = mitogen.core.Latch()
wait_recv = mitogen.core.Receiver(self.router)
ts = [
threading.Thread(target=lambda: self.wait(latch, wait_recv))
for x in range(5)
]
for t in ts:
t.start()
sender = wait_recv.to_sender()
sender.close()
def throw():
raise latch.get()
for x in range(5):
e = self.assertRaises(mitogen.core.ChannelError, throw)
self.assertEqual(e.args[0], mitogen.core.Receiver.closed_msg)
for t in ts:
t.join()
# TODO: what happens to a Select subscribed to the receiver in this case?
class ToSenderTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.core.Receiver
def test_returned_context(self):
myself = self.router.myself()
recv = self.klass(self.router)
self.assertEqual(myself, recv.to_sender().context)
|