File: test_ssl.py

package info (click to toggle)
rpyc 6.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,324 kB
  • sloc: python: 6,442; makefile: 122
file content (53 lines) | stat: -rw-r--r-- 1,610 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import rpyc
import os
import unittest
from rpyc.utils.authenticators import SSLAuthenticator
from rpyc.utils.server import ThreadedServer
from rpyc import SlaveService

try:
    import ssl  # noqa
    _ssl_import_failed = False
except ImportError:
    _ssl_import_failed = True


@unittest.skip("SSL certificate expired")
class Test_SSL(unittest.TestCase):
    '''
    created key like that
    http://www.akadia.com/services/ssh_test_certificate.html

    openssl req -newkey rsa:1024 -nodes -keyout mycert.pem -out mycert.pem
    '''

    def setUp(self):
        self.key = os.path.join(os.path.dirname(__file__), "server.key")
        self.cert = os.path.join(os.path.dirname(__file__), "server.crt")
        print(self.cert, self.key)

        authenticator = SSLAuthenticator(self.key, self.cert)
        self.server = ThreadedServer(SlaveService, port=18812,
                                     auto_register=False, authenticator=authenticator)
        self.server.logger.quiet = False
        self.server._start_in_thread()

    def tearDown(self):
        while self.server.clients:
            pass
        self.server.close()

    def test_ssl_conenction(self):
        c = rpyc.classic.ssl_connect("localhost", port=18812,
                                     keyfile=self.key, certfile=self.cert)
        print(repr(c))
        print(c.modules.sys)
        print(c.modules["xml.dom.minidom"].parseString("<a/>"))
        c.execute("x = 5")
        self.assertEqual(c.namespace["x"], 5)
        self.assertEqual(c.eval("1+x"), 6)
        c.close()


if __name__ == "__main__":
    unittest.main()