1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
import os
import socket
import subprocess
import sys
import time
import mitogen
import mitogen.master
import mitogen.service
import mitogen.unix
import testlib
class MyService(mitogen.service.Service):
def __init__(self, latch, **kwargs):
super(MyService, self).__init__(**kwargs)
# used to wake up main thread once client has made its request
self.latch = latch
@classmethod
def name(cls):
# Because this is loaded from both __main__ and whatever unit2 does,
# specify a fixed name.
return 'unix_test.MyService'
@mitogen.service.expose(policy=mitogen.service.AllowParents())
def ping(self, msg):
self.latch.put(None)
return {
'src_id': msg.src_id,
'auth_id': msg.auth_id,
}
class IsPathDeadTest(testlib.TestCase):
func = staticmethod(mitogen.unix.is_path_dead)
path = '/tmp/stale-socket'
def test_does_not_exist(self):
self.assertTrue(self.func('/tmp/does-not-exist'))
def make_socket(self):
if os.path.exists(self.path):
os.unlink(self.path)
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.bind(self.path)
return s
def test_conn_refused(self):
s = self.make_socket()
s.close()
self.assertTrue(self.func(self.path))
def test_is_alive(self):
s = self.make_socket()
s.listen(5)
self.assertFalse(self.func(self.path))
s.close()
os.unlink(self.path)
class ListenerTest(testlib.RouterMixin, testlib.TestCase):
klass = mitogen.unix.Listener
def test_constructor_basic(self):
listener = self.klass.build_stream(router=self.router)
self.assertFalse(mitogen.unix.is_path_dead(listener.protocol.path))
os.unlink(listener.protocol.path)
# ensure we catch 0 byte read error log message
self.broker.shutdown()
self.broker.join()
self.broker_shutdown = True
class ClientTest(testlib.TestCase):
klass = mitogen.unix.Listener
def _try_connect(self, path):
# give server a chance to setup listener
timeout = mitogen.core.now() + 30.0
while True:
try:
return mitogen.unix.connect(path)
except mitogen.unix.ConnectError:
if mitogen.core.now() > timeout:
raise
time.sleep(0.1)
def _test_simple_client(self, path):
router, context = self._try_connect(path)
try:
self.assertEqual(0, context.context_id)
self.assertEqual(1, mitogen.context_id)
self.assertEqual(0, mitogen.parent_id)
resp = context.call_service(service_name=MyService, method_name='ping')
self.assertEqual(mitogen.context_id, resp['src_id'])
self.assertEqual(0, resp['auth_id'])
finally:
router.broker.shutdown()
router.broker.join()
os.unlink(path)
@classmethod
def _test_simple_server(cls, path):
router = mitogen.master.Router()
latch = mitogen.core.Latch()
try:
try:
listener = cls.klass.build_stream(path=path, router=router)
pool = mitogen.service.Pool(router=router, services=[
MyService(latch=latch, router=router),
])
latch.get()
# give broker a chance to deliver service resopnse
time.sleep(0.1)
finally:
pool.shutdown()
pool.join()
router.broker.shutdown()
router.broker.join()
finally:
os._exit(0)
def test_simple(self):
path = mitogen.unix.make_socket_path()
proc = subprocess.Popen(
[sys.executable, __file__, 'ClientTest_server', path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
try:
self._test_simple_client(path)
finally:
# TODO :)
mitogen.context_id = 0
mitogen.parent_id = None
mitogen.parent_ids = []
b_stdout, _ = proc.communicate()
self.assertEqual(proc.returncode, 0)
self.assertEqual(b_stdout.decode(), '')
if __name__ == '__main__':
if len(sys.argv) == 3 and sys.argv[1] == 'ClientTest_server':
ClientTest._test_simple_server(path=sys.argv[2])
|