1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
|
"""
Tests the stem.process functions with various use cases.
"""
import shutil
import subprocess
import tempfile
import time
import unittest
import stem.prereq
import stem.process
import stem.socket
import stem.util.system
import stem.version
import test.runner
try:
# added in python 3.3
from unittest.mock import patch
except ImportError:
from mock import patch
class TestProcess(unittest.TestCase):
def setUp(self):
self.data_directory = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.data_directory)
def test_launch_tor_with_config(self):
"""
Exercises launch_tor_with_config.
"""
if test.runner.only_run_once(self, 'test_launch_tor_with_config'):
return
# Launch tor without a torrc, but with a control port. Confirms that this
# works by checking that we're still able to access the new instance.
runner = test.runner.get_runner()
tor_process = stem.process.launch_tor_with_config(
tor_cmd = runner.get_tor_command(),
config = {
'SocksPort': '2777',
'ControlPort': '2778',
'DataDirectory': self.data_directory,
},
completion_percent = 5
)
control_socket = None
try:
control_socket = stem.socket.ControlPort(port = 2778)
stem.connection.authenticate(control_socket, chroot_path = runner.get_chroot())
# exercises the socket
control_socket.send('GETCONF ControlPort')
getconf_response = control_socket.recv()
self.assertEquals('ControlPort=2778', str(getconf_response))
finally:
if control_socket:
control_socket.close()
tor_process.kill()
tor_process.wait()
def test_launch_tor_with_timeout(self):
"""
Runs launch_tor where it times out before completing.
"""
if test.runner.only_run_once(self, 'test_launch_tor_with_timeout'):
return
runner = test.runner.get_runner()
start_time = time.time()
config = {'SocksPort': '2777', 'DataDirectory': self.data_directory}
self.assertRaises(OSError, stem.process.launch_tor_with_config, config, runner.get_tor_command(), 100, None, 2)
runtime = time.time() - start_time
if not (runtime > 2 and runtime < 3):
self.fail('Test should have taken 2-3 seconds, took %i instead' % runtime)
@patch('os.getpid')
def test_take_ownership_via_pid(self, getpid_mock):
"""
Checks that the tor process quits after we do if we set take_ownership. To
test this we spawn a process and trick tor into thinking that it is us.
"""
if not stem.util.system.is_available('sleep'):
test.runner.skip(self, "('sleep' command is unavailable)")
return
elif test.runner.only_run_once(self, 'test_take_ownership_via_pid'):
return
elif test.runner.require_version(self, stem.version.Requirement.TAKEOWNERSHIP):
return
sleep_process = subprocess.Popen(['sleep', '60'])
getpid_mock.return_value = str(sleep_process.pid)
tor_process = stem.process.launch_tor_with_config(
tor_cmd = test.runner.get_runner().get_tor_command(),
config = {
'SocksPort': '2777',
'ControlPort': '2778',
'DataDirectory': self.data_directory,
},
completion_percent = 5,
take_ownership = True,
)
# Kill the sleep command. Tor should quit shortly after.
sleep_process.kill()
sleep_process.communicate()
# tor polls for the process every fifteen seconds so this may take a
# while...
for seconds_waited in xrange(30):
if tor_process.poll() == 0:
return # tor exited
time.sleep(1)
self.fail("tor didn't quit after the process that owned it terminated")
def test_take_ownership_via_controller(self):
"""
Checks that the tor process quits after the controller that owns it
connects, then disconnects..
"""
if test.runner.only_run_once(self, 'test_take_ownership_via_controller'):
return
elif test.runner.require_version(self, stem.version.Requirement.TAKEOWNERSHIP):
return
tor_process = stem.process.launch_tor_with_config(
tor_cmd = test.runner.get_runner().get_tor_command(),
config = {
'SocksPort': '2777',
'ControlPort': '2778',
'DataDirectory': self.data_directory,
},
completion_percent = 5,
take_ownership = True,
)
# We're the controlling process. Just need to connect then disconnect.
controller = stem.control.Controller.from_port(port = 2778)
controller.authenticate()
controller.close()
# give tor a few seconds to quit
for seconds_waited in xrange(5):
if tor_process.poll() == 0:
return # tor exited
time.sleep(1)
self.fail("tor didn't quit after the controller that owned it disconnected")
|