File: process.py

package info (click to toggle)
python-stem 1.2.2-1.1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 4,568 kB
  • ctags: 2,036
  • sloc: python: 20,108; makefile: 127; sh: 3
file content (167 lines) | stat: -rw-r--r-- 4,793 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""
Tests the stem.process functions with various use cases.
"""

import shutil
import subprocess
import tempfile
import time
import unittest

import stem.prereq
import stem.process
import stem.socket
import stem.util.system
import stem.version
import test.runner

try:
  # added in python 3.3
  from unittest.mock import patch
except ImportError:
  from mock import patch


class TestProcess(unittest.TestCase):
  def setUp(self):
    self.data_directory = tempfile.mkdtemp()

  def tearDown(self):
    shutil.rmtree(self.data_directory)

  def test_launch_tor_with_config(self):
    """
    Exercises launch_tor_with_config.
    """

    if test.runner.only_run_once(self, 'test_launch_tor_with_config'):
      return

    # Launch tor without a torrc, but with a control port. Confirms that this
    # works by checking that we're still able to access the new instance.

    runner = test.runner.get_runner()
    tor_process = stem.process.launch_tor_with_config(
      tor_cmd = runner.get_tor_command(),
      config = {
        'SocksPort': '2777',
        'ControlPort': '2778',
        'DataDirectory': self.data_directory,
      },
      completion_percent = 5
    )

    control_socket = None
    try:
      control_socket = stem.socket.ControlPort(port = 2778)
      stem.connection.authenticate(control_socket, chroot_path = runner.get_chroot())

      # exercises the socket
      control_socket.send('GETCONF ControlPort')
      getconf_response = control_socket.recv()
      self.assertEquals('ControlPort=2778', str(getconf_response))
    finally:
      if control_socket:
        control_socket.close()

      tor_process.kill()
      tor_process.wait()

  def test_launch_tor_with_timeout(self):
    """
    Runs launch_tor where it times out before completing.
    """

    if test.runner.only_run_once(self, 'test_launch_tor_with_timeout'):
      return

    runner = test.runner.get_runner()
    start_time = time.time()
    config = {'SocksPort': '2777', 'DataDirectory': self.data_directory}
    self.assertRaises(OSError, stem.process.launch_tor_with_config, config, runner.get_tor_command(), 100, None, 2)
    runtime = time.time() - start_time

    if not (runtime > 2 and runtime < 3):
      self.fail('Test should have taken 2-3 seconds, took %i instead' % runtime)

  @patch('os.getpid')
  def test_take_ownership_via_pid(self, getpid_mock):
    """
    Checks that the tor process quits after we do if we set take_ownership. To
    test this we spawn a process and trick tor into thinking that it is us.
    """

    if not stem.util.system.is_available('sleep'):
      test.runner.skip(self, "('sleep' command is unavailable)")
      return
    elif test.runner.only_run_once(self, 'test_take_ownership_via_pid'):
      return
    elif test.runner.require_version(self, stem.version.Requirement.TAKEOWNERSHIP):
      return

    sleep_process = subprocess.Popen(['sleep', '60'])
    getpid_mock.return_value = str(sleep_process.pid)

    tor_process = stem.process.launch_tor_with_config(
      tor_cmd = test.runner.get_runner().get_tor_command(),
      config = {
        'SocksPort': '2777',
        'ControlPort': '2778',
        'DataDirectory': self.data_directory,
      },
      completion_percent = 5,
      take_ownership = True,
    )

    # Kill the sleep command. Tor should quit shortly after.

    sleep_process.kill()
    sleep_process.communicate()

    # tor polls for the process every fifteen seconds so this may take a
    # while...

    for seconds_waited in xrange(30):
      if tor_process.poll() == 0:
        return  # tor exited

      time.sleep(1)

    self.fail("tor didn't quit after the process that owned it terminated")

  def test_take_ownership_via_controller(self):
    """
    Checks that the tor process quits after the controller that owns it
    connects, then disconnects..
    """

    if test.runner.only_run_once(self, 'test_take_ownership_via_controller'):
      return
    elif test.runner.require_version(self, stem.version.Requirement.TAKEOWNERSHIP):
      return

    tor_process = stem.process.launch_tor_with_config(
      tor_cmd = test.runner.get_runner().get_tor_command(),
      config = {
        'SocksPort': '2777',
        'ControlPort': '2778',
        'DataDirectory': self.data_directory,
      },
      completion_percent = 5,
      take_ownership = True,
    )

    # We're the controlling process. Just need to connect then disconnect.

    controller = stem.control.Controller.from_port(port = 2778)
    controller.authenticate()
    controller.close()

    # give tor a few seconds to quit
    for seconds_waited in xrange(5):
      if tor_process.poll() == 0:
        return  # tor exited

      time.sleep(1)

    self.fail("tor didn't quit after the controller that owned it disconnected")