1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
|
from __future__ import with_statement
import unittest
import subprocess
import sys
from plumbum import SshMachine
from plumbum.machines.paramiko_machine import ParamikoMachine
from rpyc.utils.zerodeploy import DeployedServer
from rpyc.core import DEFAULT_CONFIG
try:
import paramiko # noqa
_paramiko_import_failed = False
except Exception:
_paramiko_import_failed = True
@unittest.skip("no passwordless ssh installed in buildd")
class TestDeploy(unittest.TestCase):
def test_deploy(self):
rem = SshMachine("localhost")
rem.env['RPYC_BIND_THREADS'] = str(DEFAULT_CONFIG['bind_threads']).lower()
SshMachine.python = rem[sys.executable]
with DeployedServer(rem) as dep:
conn = dep.classic_connect()
print(conn.modules.sys)
func = conn.modules.os.getcwd
print(func())
conn.close()
try:
func()
except EOFError:
pass
else:
self.fail("expected an EOFError")
rem.close()
def test_close_timeout(self):
expected_timeout = 4
observed_timeouts = []
original_communicate = subprocess.Popen.communicate
def replacement_communicate(self, input=None, timeout=None):
observed_timeouts.append(timeout)
return original_communicate(self, input, timeout)
try:
subprocess.Popen.communicate = replacement_communicate
rem = SshMachine("localhost")
rem.env['RPYC_BIND_THREADS'] = str(DEFAULT_CONFIG['bind_threads']).lower()
SshMachine.python = rem[sys.executable]
dep = DeployedServer(rem)
conn = dep.classic_connect()
dep.close(timeout=expected_timeout)
rem.close()
conn.close()
finally:
subprocess.Popen.communicate = original_communicate
# The last three calls to communicate() happen during close(), so check they
# applied the timeout.
self.assertEqual(observed_timeouts[-3:], [expected_timeout] * 3)
def test_close_timeout_default_none(self):
observed_timeouts = []
original_communicate = subprocess.Popen.communicate
def replacement_communicate(self, input=None, timeout=None):
observed_timeouts.append(timeout)
return original_communicate(self, input, timeout)
try:
subprocess.Popen.communicate = replacement_communicate
rem = SshMachine("localhost")
rem.env['RPYC_BIND_THREADS'] = str(DEFAULT_CONFIG['bind_threads']).lower()
SshMachine.python = rem[sys.executable]
dep = DeployedServer(rem)
conn = dep.classic_connect()
dep.close()
rem.close()
conn.close()
finally:
subprocess.Popen.communicate = original_communicate
# No timeout specified, so Popen.communicate should have been called with timeout None.
self.assertEqual(observed_timeouts, [None] * len(observed_timeouts))
@unittest.skipIf(_paramiko_import_failed, "Paramiko is not available")
def test_deploy_paramiko(self):
rem = ParamikoMachine("localhost", missing_host_policy=paramiko.AutoAddPolicy())
rem.env['RPYC_BIND_THREADS'] = str(DEFAULT_CONFIG['bind_threads']).lower()
with DeployedServer(rem) as dep:
conn = dep.classic_connect()
print(conn.modules.sys)
func = conn.modules.os.getcwd
print(func())
conn.close()
try:
func()
except EOFError:
pass
else:
self.fail("expected an EOFError")
rem.close()
if __name__ == "__main__":
unittest.main()
|