File: test_registry.py

package info (click to toggle)
rpyc 6.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,324 kB
  • sloc: python: 6,442; makefile: 122
file content (98 lines) | stat: -rw-r--r-- 2,818 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import time
import unittest

import rpyc
from rpyc.utils.registry import TCPRegistryServer, TCPRegistryClient
from rpyc.utils.registry import UDPRegistryServer, UDPRegistryClient


PRUNING_TIMEOUT = 5


class BaseRegistryTest(object):
    def _get_server(self):
        raise NotImplementedError

    def _get_client(self):
        raise NotImplementedError

    def setUp(self):
        self.server = self._get_server()
        self.server.logger.quiet = True
        self.server_thread = rpyc.spawn(self.server.start)

    def tearDown(self):
        self.server.close()
        self.server_thread.join()

    def test_api(self):
        c = self._get_client()
        c.logger.quiet = True
        c.register(("FOO",), 12345)
        c.register(("FOO",), 45678)
        res = c.discover("FOO")
        expected = (12345, 45678)
        self.assertEqual(set(p for _, p in res), set(expected))
        c.unregister(12345)
        res = c.discover("FOO")
        expected = (45678,)
        self.assertEqual(set(p for _, p in res), set(expected))
        res = c.list()
        expected = ("FOO",)
        self.assertEqual(set(p for p in res), set(expected))
        c.register(("BAR",), 54321)
        res = c.list()
        expected = ("FOO", "BAR")
        self.assertEqual(set(res), set(expected))

    def test_pruning(self):
        c = self._get_client()
        c.logger.quiet = True
        c.register(("BAR",), 17171)

        time.sleep(1)
        res = c.discover("BAR")
        self.assertEqual(set(p for _, p in res), set((17171,)))

        time.sleep(PRUNING_TIMEOUT)
        res = c.discover("BAR")
        self.assertEqual(res, ())

    def test_listing(self):
        c = self._get_client()
        c.logger.quiet = True

        c.register(("FOO",), 12345)
        c.register(("BAR", ), 54321, interface='127.0.0.2')
        host_ip = c.discover("FOO")[0][0]

        # test basic listing
        res = c.list()
        expected = ("FOO", "BAR")
        self.assertEqual(set(p for p in res), set(expected))

        # test listing with filter
        res = c.list(filter_host=host_ip)
        expected = ("FOO",)
        self.assertEqual(set(res), set(expected))


class TestTcpRegistry(BaseRegistryTest, unittest.TestCase):
    def _get_server(self):
        return TCPRegistryServer(host="127.0.0.1", pruning_timeout=PRUNING_TIMEOUT, allow_listing=True)

    def _get_client(self):
        return TCPRegistryClient(ip="127.0.0.1")


class TestUdpRegistry(BaseRegistryTest, unittest.TestCase):
    """ May fail due to iptables/packet-drops. """
    def _get_server(self):
        return UDPRegistryServer(host="0.0.0.0", pruning_timeout=PRUNING_TIMEOUT, allow_listing=True)

    def _get_client(self):
        return UDPRegistryClient("localhost")


if __name__ == "__main__":
    unittest.main()