File: control_socket.py

package info (click to toggle)
python-stem 1.4.1b-1~bpo8%2B1
  • links: PTS, VCS
  • area: main
  • in suites: jessie-backports
  • size: 5,132 kB
  • sloc: python: 22,776; makefile: 130; sh: 3
file content (159 lines) | stat: -rw-r--r-- 5,369 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""
Integration tests for the stem.socket.ControlSocket subclasses. When ran these
test basic functionality for a ControlPort *or* ControlSocketFile, depending on
which can connect to our tor instance.

These tests share a similar domain with those for the ControlMessage, but where
those focus on parsing and correctness of the content these are more concerned
with the behavior of the socket itself.
"""

import time
import unittest

import stem.connection
import stem.control
import stem.socket
import test.runner

from test.runner import require_controller


class TestControlSocket(unittest.TestCase):
  @require_controller
  def test_connection_time(self):
    """
    Checks that our connection_time method tracks when our state's changed.
    """

    test_start = time.time()
    runner = test.runner.get_runner()

    with runner.get_tor_socket() as control_socket:
      connection_time = control_socket.connection_time()

      # connection time should be between our tests start and now

      self.assertTrue(test_start <= connection_time <= time.time())

      # connection time should be absolute (shouldn't change as time goes on)

      time.sleep(0.1)
      self.assertEqual(connection_time, control_socket.connection_time())

      # should change to the disconnection time if we detactch

      control_socket.close()
      disconnection_time = control_socket.connection_time()
      self.assertTrue(connection_time < disconnection_time <= time.time())

      # then change again if we reconnect

      time.sleep(0.1)
      control_socket.connect()
      reconnection_time = control_socket.connection_time()
      self.assertTrue(disconnection_time < reconnection_time <= time.time())

  @require_controller
  def test_send_buffered(self):
    """
    Sends multiple requests before receiving back any of the replies.
    """

    runner = test.runner.get_runner()
    tor_version = runner.get_tor_version()

    with runner.get_tor_socket() as control_socket:
      for _ in range(100):
        control_socket.send('GETINFO version')

      for _ in range(100):
        response = control_socket.recv()
        self.assertTrue(str(response).startswith('version=%s' % tor_version))
        self.assertTrue(str(response).endswith('\nOK'))

  @require_controller
  def test_send_closed(self):
    """
    Sends a message after we've closed the connection.
    """

    with test.runner.get_runner().get_tor_socket() as control_socket:
      self.assertTrue(control_socket.is_alive())
      control_socket.close()
      self.assertFalse(control_socket.is_alive())

      self.assertRaises(stem.SocketClosed, control_socket.send, 'blarg')

  @require_controller
  def test_send_disconnected(self):
    """
    Sends a message to a socket that has been disconnected by the other end.

    Our behavior upon disconnection slightly differs based on if we're a port
    or socket file based connection. With a control port we won't notice the
    disconnect (is_alive() will return True) until we've made a failed recv()
    call. With a file socket, however, we'll also fail when calling send().
    """

    with test.runner.get_runner().get_tor_socket() as control_socket:
      control_socket.send('QUIT')
      self.assertEqual('closing connection', str(control_socket.recv()))
      self.assertTrue(control_socket.is_alive())

      # If we send another message to a port based socket then it will seem to
      # succeed. However, a file based socket should report a failure.

      if isinstance(control_socket, stem.socket.ControlPort):
        control_socket.send('blarg')
        self.assertTrue(control_socket.is_alive())
      else:
        self.assertRaises(stem.SocketClosed, control_socket.send, 'blarg')
        self.assertFalse(control_socket.is_alive())

  @require_controller
  def test_recv_closed(self):
    """
    Receives a message after we've closed the connection.
    """

    with test.runner.get_runner().get_tor_socket() as control_socket:
      self.assertTrue(control_socket.is_alive())
      control_socket.close()
      self.assertFalse(control_socket.is_alive())

      self.assertRaises(stem.SocketClosed, control_socket.recv)

  @require_controller
  def test_recv_disconnected(self):
    """
    Receives a message from a socket that has been disconnected by the other
    end.
    """

    with test.runner.get_runner().get_tor_socket() as control_socket:
      control_socket.send('QUIT')
      self.assertEqual('closing connection', str(control_socket.recv()))

      # Neither a port or file based socket will know that tor has hung up on
      # the connection at this point. We should know after calling recv(),
      # however.

      self.assertTrue(control_socket.is_alive())
      self.assertRaises(stem.SocketClosed, control_socket.recv)
      self.assertFalse(control_socket.is_alive())

  @require_controller
  def test_connect_repeatedly(self):
    """
    Checks that we can reconnect, use, and disconnect a socket repeatedly.
    """

    with test.runner.get_runner().get_tor_socket(False) as control_socket:
      for _ in range(10):
        # this will raise if the PROTOCOLINFO query fails
        stem.connection.get_protocolinfo(control_socket)

        control_socket.close()
        self.assertRaises(stem.SocketClosed, control_socket.send, 'PROTOCOLINFO 1')
        control_socket.connect()