File: commands.py

package info (click to toggle)
python-stem 1.2.2-1.1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 4,568 kB
  • ctags: 2,036
  • sloc: python: 20,108; makefile: 127; sh: 3
file content (197 lines) | stat: -rw-r--r-- 6,995 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import datetime
import unittest

import stem
import stem.response
import stem.version

from stem.interpreter.commands import ControlInterpretor, _get_fingerprint

from test import mocking
from test.unit.interpreter import CONTROLLER

try:
  # added in python 3.3
  from unittest.mock import Mock, patch
except ImportError:
  from mock import Mock, patch

EXPECTED_EVENTS_RESPONSE = """\
\x1b[34mBW 15 25\x1b[0m
\x1b[34mBW 758 570\x1b[0m
\x1b[34mDEBUG connection_edge_process_relay_cell(): Got an extended cell! Yay.\x1b[0m
"""

EXPECTED_INFO_RESPONSE = """\
moria1 (9695DFC35FFEB861329B9F1AB04C46397020CE31)
\x1b[34;1maddress: \x1b[0m128.31.0.34:9101 (moria.csail.mit.edu)
\x1b[34;1mtor version: \x1b[0m0.2.5.4-alpha-dev
\x1b[34;1mflags: \x1b[0mAuthority, Fast, Guard, HSDir, Named, Running, Stable, V2Dir, Valid
\x1b[34;1mexit policy: \x1b[0mreject *:*
\x1b[34;1mcontact: \x1b[0m1024D/28988BF5 arma mit edu
"""

EXPECTED_GETCONF_RESPONSE = """\
\x1b[34;1mlog\x1b[0m\x1b[34m => notice stdout\x1b[0m
\x1b[34;1maddress\x1b[0m\x1b[34m => \x1b[0m

"""

FINGERPRINT = '9695DFC35FFEB861329B9F1AB04C46397020CE31'


class TestInterpretorCommands(unittest.TestCase):
  def test_get_fingerprint_for_ourselves(self):
    controller = Mock()

    controller.get_info.side_effect = lambda arg: {
      'fingerprint': FINGERPRINT,
    }[arg]

    self.assertEqual(FINGERPRINT, _get_fingerprint('', controller))

    controller.get_info.side_effect = stem.ControllerError
    self.assertRaises(ValueError, _get_fingerprint, '', controller)

  def test_get_fingerprint_for_fingerprint(self):
    self.assertEqual(FINGERPRINT, _get_fingerprint(FINGERPRINT, Mock()))

  def test_get_fingerprint_for_nickname(self):
    controller, descriptor = Mock(), Mock()
    descriptor.fingerprint = FINGERPRINT

    controller.get_network_status.side_effect = lambda arg: {
      'moria1': descriptor,
    }[arg]

    self.assertEqual(FINGERPRINT, _get_fingerprint('moria1', controller))

    controller.get_network_status.side_effect = stem.ControllerError
    self.assertRaises(ValueError, _get_fingerprint, 'moria1', controller)

  def test_get_fingerprint_for_address(self):
    controller = Mock()

    self.assertRaises(ValueError, _get_fingerprint, '127.0.0.1:-1', controller)
    self.assertRaises(ValueError, _get_fingerprint, '127.0.0.901:80', controller)

    descriptor = Mock()
    descriptor.address = '127.0.0.1'
    descriptor.or_port = 80
    descriptor.fingerprint = FINGERPRINT

    controller.get_network_statuses.return_value = [descriptor]

    self.assertEqual(FINGERPRINT, _get_fingerprint('127.0.0.1', controller))
    self.assertEqual(FINGERPRINT, _get_fingerprint('127.0.0.1:80', controller))
    self.assertRaises(ValueError, _get_fingerprint, '127.0.0.1:81', controller)
    self.assertRaises(ValueError, _get_fingerprint, '127.0.0.2', controller)

  def test_get_fingerprint_for_unrecognized_inputs(self):
    self.assertRaises(ValueError, _get_fingerprint, 'blarg!', Mock())

  def test_when_disconnected(self):
    controller = Mock()
    controller.is_alive.return_value = False

    interpreter = ControlInterpretor(controller)
    self.assertRaises(stem.SocketClosed, interpreter.run_command, '/help')

  def test_quit(self):
    interpreter = ControlInterpretor(CONTROLLER)
    self.assertRaises(stem.SocketClosed, interpreter.run_command, '/quit')
    self.assertRaises(stem.SocketClosed, interpreter.run_command, 'QUIT')

  def test_help(self):
    interpreter = ControlInterpretor(CONTROLLER)

    self.assertTrue('Interpretor commands include:' in interpreter.run_command('/help'))
    self.assertTrue('Queries the tor process for information.' in interpreter.run_command('/help GETINFO'))
    self.assertTrue('Queries the tor process for information.' in interpreter.run_command('/help GETINFO version'))

  def test_events(self):
    interpreter = ControlInterpretor(CONTROLLER)

    # no received events

    self.assertEqual('\n', interpreter.run_command('/events'))

    # with enqueued events

    event_contents = (
      '650 BW 15 25',
      '650 BW 758 570',
      '650 DEBUG connection_edge_process_relay_cell(): Got an extended cell! Yay.',
    )

    for content in event_contents:
      event = mocking.get_message(content)
      stem.response.convert('EVENT', event)
      interpreter._received_events.append(event)

    self.assertEqual(EXPECTED_EVENTS_RESPONSE, interpreter.run_command('/events'))

  @patch('socket.gethostbyaddr', Mock(return_value = ['moria.csail.mit.edu']))
  def test_info(self):
    controller, server_desc, ns_desc = Mock(), Mock(), Mock()

    controller.get_microdescriptor.return_value = None
    controller.get_server_descriptor.return_value = server_desc
    controller.get_network_status.return_value = ns_desc

    controller.get_info.side_effect = lambda arg, _: {
      'ip-to-country/128.31.0.34': 'us',
    }[arg]

    ns_desc.address = '128.31.0.34'
    ns_desc.or_port = 9101
    ns_desc.published = datetime.datetime(2014, 5, 5, 5, 52, 5)
    ns_desc.nickname = 'moria1'
    ns_desc.flags = ['Authority', 'Fast', 'Guard', 'HSDir', 'Named', 'Running', 'Stable', 'V2Dir', 'Valid']

    server_desc.exit_policy.summary.return_value = 'reject 1-65535'
    server_desc.platform = 'Linux'
    server_desc.tor_version = stem.version.Version('0.2.5.3-alpha-dev')
    server_desc.contact = '1024D/28988BF5 arma mit edu'

    interpreter = ControlInterpretor(controller)
    self.assertTrue(interpreter.run_command('/info ' + FINGERPRINT).startswith(EXPECTED_INFO_RESPONSE))

  def test_unrecognized_interpreter_command(self):
    interpreter = ControlInterpretor(CONTROLLER)

    expected = "\x1b[1;31m'/unrecognized' isn't a recognized command\x1b[0m\n"
    self.assertEqual(expected, interpreter.run_command('/unrecognized'))

  def test_getinfo(self):
    response = '250-version=0.2.5.1-alpha-dev (git-245ecfff36c0cecc)\r\n250 OK'

    controller = Mock()
    controller.msg.return_value = mocking.get_message(response)

    interpreter = ControlInterpretor(controller)

    self.assertEqual('\x1b[34m%s\x1b[0m\n' % response, interpreter.run_command('GETINFO version'))
    controller.msg.assert_called_with('GETINFO version')

    controller.msg.side_effect = stem.ControllerError('kaboom!')
    self.assertEqual('\x1b[1;31mkaboom!\x1b[0m\n', interpreter.run_command('getinfo process/user'))

  def test_getconf(self):
    response = '250-Log=notice stdout\r\n250 Address'

    controller = Mock()
    controller.msg.return_value = mocking.get_message(response)

    interpreter = ControlInterpretor(controller)

    self.assertEqual('\x1b[34m%s\x1b[0m\n' % response, interpreter.run_command('GETCONF log address'))
    controller.msg.assert_called_with('GETCONF log address')

  def test_setevents(self):
    controller = Mock()
    controller.msg.return_value = mocking.get_message('250 OK')

    interpreter = ControlInterpretor(controller)

    self.assertEqual('\x1b[34m250 OK\x1b[0m\n', interpreter.run_command('SETEVENTS BW'))