File: control_message.py

package info (click to toggle)
python-stem 1.2.2-1.1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 4,568 kB
  • ctags: 2,036
  • sloc: python: 20,108; makefile: 127; sh: 3
file content (197 lines) | stat: -rw-r--r-- 7,205 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
"""
Unit tests for the stem.response.ControlMessage parsing and class.
"""

import socket
import StringIO
import unittest

import stem.socket
import stem.response
import stem.response.getinfo

OK_REPLY = '250 OK\r\n'

EVENT_BW = '650 BW 32326 2856\r\n'
EVENT_CIRC_TIMEOUT = '650 CIRC 5 FAILED PURPOSE=GENERAL REASON=TIMEOUT\r\n'
EVENT_CIRC_LAUNCHED = '650 CIRC 9 LAUNCHED PURPOSE=GENERAL\r\n'
EVENT_CIRC_EXTENDED = '650 CIRC 5 EXTENDED $A200F527C82C59A25CCA44884B49D3D65B122652=faktor PURPOSE=MEASURE_TIMEOUT\r\n'

GETINFO_VERSION = """250-version=0.2.2.23-alpha (git-b85eb949b528f4d7)
250 OK
""".replace('\n', '\r\n')

GETINFO_INFONAMES = """250+info/names=
accounting/bytes -- Number of bytes read/written so far in the accounting interval.
accounting/bytes-left -- Number of bytes left to write/read so far in the accounting interval.
accounting/enabled -- Is accounting currently enabled?
accounting/hibernating -- Are we hibernating or awake?
stream-status -- List of current streams.
version -- The current version of Tor.
.
250 OK
""".replace('\n', '\r\n')


class TestControlMessage(unittest.TestCase):
  def test_from_str(self):
    msg = stem.response.ControlMessage.from_str(GETINFO_VERSION)

    self.assertTrue(isinstance(msg, stem.response.ControlMessage))
    self.assertEqual('version=0.2.2.23-alpha (git-b85eb949b528f4d7)\nOK', str(msg))

    msg = stem.response.ControlMessage.from_str(GETINFO_VERSION, 'GETINFO')
    self.assertTrue(isinstance(msg, stem.response.getinfo.GetInfoResponse))
    self.assertEqual({'version': b'0.2.2.23-alpha (git-b85eb949b528f4d7)'}, msg.entries)

  def test_ok_response(self):
    """
    Checks the basic 'OK' response that we get for most commands.
    """

    message = self._assert_message_parses(OK_REPLY)
    self.assertEquals('OK', str(message))

    contents = message.content()
    self.assertEquals(1, len(contents))
    self.assertEquals(('250', ' ', 'OK'), contents[0])

  def test_event_response(self):
    """
    Checks parsing of actual events.
    """

    # BW event
    message = self._assert_message_parses(EVENT_BW)
    self.assertEquals('BW 32326 2856', str(message))

    contents = message.content()
    self.assertEquals(1, len(contents))
    self.assertEquals(('650', ' ', 'BW 32326 2856'), contents[0])

    # few types of CIRC events
    for circ_content in (EVENT_CIRC_TIMEOUT, EVENT_CIRC_LAUNCHED, EVENT_CIRC_EXTENDED):
      message = self._assert_message_parses(circ_content)
      self.assertEquals(circ_content[4:-2], str(message))

      contents = message.content()
      self.assertEquals(1, len(contents))
      self.assertEquals(('650', ' ', str(message)), contents[0])

  def test_getinfo_response(self):
    """
    Checks parsing of actual GETINFO responses.
    """

    # GETINFO version (basic single-line results)
    message = self._assert_message_parses(GETINFO_VERSION)
    self.assertEquals(2, len(list(message)))
    self.assertEquals(2, len(str(message).splitlines()))

    # manually checks the contents
    contents = message.content()
    self.assertEquals(2, len(contents))
    self.assertEquals(('250', '-', 'version=0.2.2.23-alpha (git-b85eb949b528f4d7)'), contents[0])
    self.assertEquals(('250', ' ', 'OK'), contents[1])

    # GETINFO info/names (data entry)
    message = self._assert_message_parses(GETINFO_INFONAMES)
    self.assertEquals(2, len(list(message)))
    self.assertEquals(8, len(str(message).splitlines()))

    # manually checks the contents
    contents = message.content()
    self.assertEquals(2, len(contents))

    first_entry = (contents[0][0], contents[0][1], contents[0][2][:contents[0][2].find('\n')])
    self.assertEquals(('250', '+', 'info/names='), first_entry)
    self.assertEquals(('250', ' ', 'OK'), contents[1])

  def test_no_crlf(self):
    """
    Checks that we get a ProtocolError when we don't have both a carriage
    return and newline for line endings. This doesn't really check for
    newlines (since that's what readline would break on), but not the end of
    the world.
    """

    # Replaces each of the CRLF entries with just LF, confirming that this
    # causes a parsing error. This should test line endings for both data
    # entry parsing and non-data.

    infonames_lines = [line + '\n' for line in GETINFO_INFONAMES.splitlines()]

    for index, line in enumerate(infonames_lines):
      # replace the CRLF for the line
      infonames_lines[index] = line.rstrip('\r\n') + '\n'
      test_socket_file = StringIO.StringIO(''.join(infonames_lines))
      self.assertRaises(stem.ProtocolError, stem.socket.recv_message, test_socket_file)

      # puts the CRLF back
      infonames_lines[index] = infonames_lines[index].rstrip('\n') + '\r\n'

    # sanity check the above test isn't broken due to leaving infonames_lines
    # with invalid data

    self._assert_message_parses(''.join(infonames_lines))

  def test_malformed_prefix(self):
    """
    Checks parsing for responses where the header is missing a digit or divider.
    """

    for index in range(len(EVENT_BW)):
      # makes test input with that character missing or replaced
      removal_test_input = EVENT_BW[:index] + EVENT_BW[index + 1:]
      replacement_test_input = EVENT_BW[:index] + '#' + EVENT_BW[index + 1:]

      if index < 4 or index >= (len(EVENT_BW) - 2):
        # dropping the character should cause an error if...
        # - this is part of the message prefix
        # - this is disrupting the line ending

        self.assertRaises(stem.ProtocolError, stem.socket.recv_message, StringIO.StringIO(removal_test_input))
        self.assertRaises(stem.ProtocolError, stem.socket.recv_message, StringIO.StringIO(replacement_test_input))
      else:
        # otherwise the data will be malformed, but this goes undetected
        self._assert_message_parses(removal_test_input)
        self._assert_message_parses(replacement_test_input)

  def test_disconnected_socket(self):
    """
    Tests when the read function is given a file derived from a disconnected
    socket.
    """

    control_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    control_socket_file = control_socket.makefile()
    self.assertRaises(stem.SocketClosed, stem.socket.recv_message, control_socket_file)

  def _assert_message_parses(self, controller_reply):
    """
    Performs some basic sanity checks that a reply mirrors its parsed result.

    Returns:
      stem.response.ControlMessage for the given input
    """

    message = stem.socket.recv_message(StringIO.StringIO(controller_reply))

    # checks that the raw_content equals the input value
    self.assertEqual(controller_reply, message.raw_content())

    # checks that the contents match the input
    message_lines = str(message).splitlines()
    controller_lines = controller_reply.split('\r\n')
    controller_lines.pop()  # the ControlMessage won't have a trailing newline

    while controller_lines:
      line = controller_lines.pop(0)

      # mismatching lines with just a period are probably data termination
      if line == '.' and (not message_lines or line != message_lines[0]):
        continue

      self.assertTrue(line.endswith(message_lines.pop(0)))

    return message