File: unix_test.py

package info (click to toggle)
python-mitogen 0.3.26-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 6,456 kB
  • sloc: python: 22,134; sh: 183; makefile: 74; perl: 19; ansic: 18
file content (146 lines) | stat: -rw-r--r-- 4,453 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
import socket
import subprocess
import sys
import time

import mitogen
import mitogen.master
import mitogen.service
import mitogen.unix

import testlib


class MyService(mitogen.service.Service):
    def __init__(self, latch, **kwargs):
        super(MyService, self).__init__(**kwargs)
        # used to wake up main thread once client has made its request
        self.latch = latch

    @classmethod
    def name(cls):
        # Because this is loaded from both __main__ and whatever unit2 does,
        # specify a fixed name.
        return 'unix_test.MyService'

    @mitogen.service.expose(policy=mitogen.service.AllowParents())
    def ping(self, msg):
        self.latch.put(None)
        return {
            'src_id': msg.src_id,
            'auth_id': msg.auth_id,
        }


class IsPathDeadTest(testlib.TestCase):
    func = staticmethod(mitogen.unix.is_path_dead)
    path = '/tmp/stale-socket'

    def test_does_not_exist(self):
        self.assertTrue(self.func('/tmp/does-not-exist'))

    def make_socket(self):
        if os.path.exists(self.path):
            os.unlink(self.path)
        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        s.bind(self.path)
        return s

    def test_conn_refused(self):
        s = self.make_socket()
        s.close()
        self.assertTrue(self.func(self.path))

    def test_is_alive(self):
        s = self.make_socket()
        s.listen(5)
        self.assertFalse(self.func(self.path))
        s.close()
        os.unlink(self.path)


class ListenerTest(testlib.RouterMixin, testlib.TestCase):
    klass = mitogen.unix.Listener

    def test_constructor_basic(self):
        listener = self.klass.build_stream(router=self.router)
        self.assertFalse(mitogen.unix.is_path_dead(listener.protocol.path))
        os.unlink(listener.protocol.path)

        # ensure we catch 0 byte read error log message
        self.broker.shutdown()
        self.broker.join()
        self.broker_shutdown = True


class ClientTest(testlib.TestCase):
    klass = mitogen.unix.Listener

    def _try_connect(self, path):
        # give server a chance to setup listener
        timeout = mitogen.core.now() + 30.0
        while True:
            try:
                return mitogen.unix.connect(path)
            except mitogen.unix.ConnectError:
                if mitogen.core.now() > timeout:
                    raise
                time.sleep(0.1)

    def _test_simple_client(self, path):
        router, context = self._try_connect(path)
        try:
            self.assertEqual(0, context.context_id)
            self.assertEqual(1, mitogen.context_id)
            self.assertEqual(0, mitogen.parent_id)
            resp = context.call_service(service_name=MyService, method_name='ping')
            self.assertEqual(mitogen.context_id, resp['src_id'])
            self.assertEqual(0, resp['auth_id'])
        finally:
            router.broker.shutdown()
            router.broker.join()
            os.unlink(path)

    @classmethod
    def _test_simple_server(cls, path):
        router = mitogen.master.Router()
        latch = mitogen.core.Latch()
        try:
            try:
                listener = cls.klass.build_stream(path=path, router=router)
                pool = mitogen.service.Pool(router=router, services=[
                    MyService(latch=latch, router=router),
                ])
                latch.get()
                # give broker a chance to deliver service resopnse
                time.sleep(0.1)
            finally:
                pool.shutdown()
                pool.join()
                router.broker.shutdown()
                router.broker.join()
        finally:
            os._exit(0)

    def test_simple(self):
        path = mitogen.unix.make_socket_path()
        proc = subprocess.Popen(
            [sys.executable, __file__, 'ClientTest_server', path],
            stdout=subprocess.PIPE, stderr=subprocess.PIPE,
        )
        try:
            self._test_simple_client(path)
        finally:
            # TODO :)
            mitogen.context_id = 0
            mitogen.parent_id = None
            mitogen.parent_ids = []
        b_stdout, _ = proc.communicate()
        self.assertEqual(proc.returncode, 0)
        self.assertEqual(b_stdout.decode(), '')


if __name__ == '__main__':
    if len(sys.argv) == 3 and sys.argv[1] == 'ClientTest_server':
        ClientTest._test_simple_server(path=sys.argv[2])