File: connection_tracker.py

package info (click to toggle)
nyx 2.1.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 7,732 kB
  • sloc: python: 7,055; makefile: 7; sh: 3
file content (122 lines) | stat: -rw-r--r-- 4,886 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import time
import unittest

from nyx.tracker import ConnectionTracker

from stem.util import connection

try:
  # added in python 3.3
  from unittest.mock import Mock, patch
except ImportError:
  from mock import Mock, patch

STEM_CONNECTIONS = [
  connection.Connection('127.0.0.1', 3531, '75.119.206.243', 22, 'tcp', False),
  connection.Connection('127.0.0.1', 1766, '86.59.30.40', 443, 'tcp', False),
  connection.Connection('127.0.0.1', 1059, '74.125.28.106', 80, 'tcp', False)
]


class TestConnectionTracker(unittest.TestCase):
  @patch('nyx.tracker.tor_controller')
  @patch('nyx.tracker.connection.get_connections')
  @patch('nyx.tracker.system', Mock(return_value = Mock()))
  @patch('stem.util.proc.is_available', Mock(return_value = False))
  @patch('nyx.tracker.connection.system_resolvers', Mock(return_value = [connection.Resolver.NETSTAT]))
  def test_fetching_connections(self, get_value_mock, tor_controller_mock):
    tor_controller_mock().get_pid.return_value = 12345
    tor_controller_mock().get_conf.return_value = '0'
    get_value_mock.return_value = STEM_CONNECTIONS

    with ConnectionTracker(0.04) as daemon:
      time.sleep(0.01)

      connections = daemon.get_value()

      self.assertEqual(1, daemon.run_counter())
      self.assertEqual([conn.remote_address for conn in STEM_CONNECTIONS], [conn.remote_address for conn in connections])

      get_value_mock.return_value = []  # no connection results
      time.sleep(0.05)
      connections = daemon.get_value()

      self.assertEqual(2, daemon.run_counter())
      self.assertEqual([], connections)

  @patch('nyx.tracker.tor_controller')
  @patch('nyx.tracker.connection.get_connections')
  @patch('nyx.tracker.system', Mock(return_value = Mock()))
  @patch('stem.util.proc.is_available', Mock(return_value = False))
  @patch('nyx.tracker.connection.system_resolvers', Mock(return_value = [connection.Resolver.NETSTAT, connection.Resolver.LSOF]))
  def test_resolver_failover(self, get_value_mock, tor_controller_mock):
    tor_controller_mock().get_pid.return_value = 12345
    tor_controller_mock().get_conf.return_value = '0'
    get_value_mock.side_effect = IOError()

    with ConnectionTracker(0.01) as daemon:
      time.sleep(0.015)

      self.assertEqual([connection.Resolver.NETSTAT, connection.Resolver.LSOF], daemon._resolvers)
      self.assertEqual([], daemon.get_value())

      time.sleep(0.025)

      self.assertEqual([connection.Resolver.LSOF], daemon._resolvers)
      self.assertEqual([], daemon.get_value())

      time.sleep(0.035)

      self.assertEqual([], daemon._resolvers)
      self.assertEqual([], daemon.get_value())

      # Now make connection resolution work. We still shouldn't provide any
      # results since we stopped looking.

      get_value_mock.return_value = STEM_CONNECTIONS[:2]
      get_value_mock.side_effect = None
      time.sleep(0.05)
      self.assertEqual([], daemon.get_value())

      # Finally, select a custom resolver. This should cause us to query again
      # reguardless of our prior failures.

      daemon.set_custom_resolver(connection.Resolver.NETSTAT)
      time.sleep(0.05)
      self.assertEqual([conn.remote_address for conn in STEM_CONNECTIONS[:2]], [conn.remote_address for conn in daemon.get_value()])

  @patch('nyx.tracker.tor_controller')
  @patch('nyx.tracker.connection.get_connections')
  @patch('nyx.tracker.system', Mock(return_value = Mock()))
  @patch('stem.util.proc.is_available', Mock(return_value = False))
  @patch('nyx.tracker.connection.system_resolvers', Mock(return_value = [connection.Resolver.NETSTAT]))
  def test_tracking_uptime(self, get_value_mock, tor_controller_mock):
    tor_controller_mock().get_pid.return_value = 12345
    tor_controller_mock().get_conf.return_value = '0'
    get_value_mock.return_value = [STEM_CONNECTIONS[0]]
    first_start_time = time.time()

    with ConnectionTracker(0.04) as daemon:
      time.sleep(0.01)

      connections = daemon.get_value()
      self.assertEqual(1, len(connections))

      self.assertEqual(STEM_CONNECTIONS[0].remote_address, connections[0].remote_address)
      self.assertTrue(first_start_time <= connections[0].start_time <= time.time())
      self.assertTrue(connections[0].is_legacy)

      second_start_time = time.time()
      get_value_mock.return_value = STEM_CONNECTIONS[:2]
      time.sleep(0.05)

      connections = daemon.get_value()
      self.assertEqual(2, len(connections))

      self.assertEqual(STEM_CONNECTIONS[0].remote_address, connections[0].remote_address)
      self.assertTrue(first_start_time < connections[0].start_time < time.time())
      self.assertTrue(connections[0].is_legacy)

      self.assertEqual(STEM_CONNECTIONS[1].remote_address, connections[1].remote_address)
      self.assertTrue(second_start_time < connections[1].start_time < time.time())
      self.assertFalse(connections[1].is_legacy)