1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
import rpyc
import os
import unittest
from rpyc.utils.authenticators import SSLAuthenticator
from rpyc.utils.server import ThreadedServer
from rpyc import SlaveService
try:
import ssl # noqa
_ssl_import_failed = False
except ImportError:
_ssl_import_failed = True
@unittest.skip("SSL certificate expired")
class Test_SSL(unittest.TestCase):
'''
created key like that
http://www.akadia.com/services/ssh_test_certificate.html
openssl req -newkey rsa:1024 -nodes -keyout mycert.pem -out mycert.pem
'''
def setUp(self):
self.key = os.path.join(os.path.dirname(__file__), "server.key")
self.cert = os.path.join(os.path.dirname(__file__), "server.crt")
print(self.cert, self.key)
authenticator = SSLAuthenticator(self.key, self.cert)
self.server = ThreadedServer(SlaveService, port=18812,
auto_register=False, authenticator=authenticator)
self.server.logger.quiet = False
self.server._start_in_thread()
def tearDown(self):
while self.server.clients:
pass
self.server.close()
def test_ssl_conenction(self):
c = rpyc.classic.ssl_connect("localhost", port=18812,
keyfile=self.key, certfile=self.cert)
print(repr(c))
print(c.modules.sys)
print(c.modules["xml.dom.minidom"].parseString("<a/>"))
c.execute("x = 5")
self.assertEqual(c.namespace["x"], 5)
self.assertEqual(c.eval("1+x"), 6)
c.close()
if __name__ == "__main__":
unittest.main()
|