File: relay_connections.py

package info (click to toggle)
python-stem 1.8.1-2.1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 8,152 kB
  • sloc: python: 33,747; java: 312; makefile: 124; sh: 14
file content (130 lines) | stat: -rwxr-xr-x 3,998 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import argparse
import collections
import time

import stem.connection
import stem.util.system
import stem.util.str_tools

from stem.control import Listener
from stem.util.connection import get_connections, port_usage, is_valid_ipv4_address

HEADER_LINE = " {version}   uptime: {uptime}   flags: {flags}\n"

DIV = '+%s+%s+%s+' % ('-' * 30, '-' * 6, '-' * 6)
COLUMN = '| %-28s | %4s | %4s |'

INBOUND_ORPORT = 'Inbound to our ORPort'
INBOUND_DIRPORT = 'Inbound to our DirPort'
INBOUND_CONTROLPORT = 'Inbound to our ControlPort'

OUTBOUND_ORPORT = 'Outbound to a relay'
OUTBOUND_EXIT = 'Outbound exit traffic'
OUTBOUND_UNKNOWN = 'Outbound uncategorized'


def main():
  parser = argparse.ArgumentParser()
  parser.add_argument("--ctrlport", help="default: 9051 or 9151")
  parser.add_argument("--resolver", help="default: autodetected")
  args = parser.parse_args()

  control_port = int(args.ctrlport) if args.ctrlport else 'default'
  controller = stem.connection.connect(control_port = ('127.0.0.1', control_port))

  if not controller:
    return

  desc = controller.get_network_status(default = None)
  pid = controller.get_pid()

  print(HEADER_LINE.format(
    version = str(controller.get_version()).split()[0],
    uptime = stem.util.str_tools.short_time_label(time.time() - stem.util.system.start_time(pid)),
    flags = ', '.join(desc.flags if desc else ['none']),
  ))

  policy = controller.get_exit_policy()
  relays = {}  # address => [orports...]

  for desc in controller.get_network_statuses():
    relays.setdefault(desc.address, []).append(desc.or_port)

  # categorize our connections

  categories = collections.OrderedDict((
    (INBOUND_ORPORT, []),
    (INBOUND_DIRPORT, []),
    (INBOUND_CONTROLPORT, []),
    (OUTBOUND_ORPORT, []),
    (OUTBOUND_EXIT, []),
    (OUTBOUND_UNKNOWN, []),
  ))

  exit_connections = {}  # port => [connections]

  for conn in get_connections(resolver = args.resolver, process_pid = pid):
    if conn.protocol == 'udp':
        continue

    if conn.local_port in controller.get_ports(Listener.OR, []):
      categories[INBOUND_ORPORT].append(conn)
    elif conn.local_port in controller.get_ports(Listener.DIR, []):
      categories[INBOUND_DIRPORT].append(conn)
    elif conn.local_port in controller.get_ports(Listener.CONTROL, []):
      categories[INBOUND_CONTROLPORT].append(conn)
    elif conn.remote_port in relays.get(conn.remote_address, []):
      categories[OUTBOUND_ORPORT].append(conn)
    elif policy.can_exit_to(conn.remote_address, conn.remote_port):
      categories[OUTBOUND_EXIT].append(conn)
      exit_connections.setdefault(conn.remote_port, []).append(conn)
    else:
      categories[OUTBOUND_UNKNOWN].append(conn)

  print(DIV)
  print(COLUMN % ('Type', 'IPv4', 'IPv6'))
  print(DIV)

  total_ipv4, total_ipv6 = 0, 0

  for label, connections in categories.items():
    if len(connections) == 0:
      continue

    ipv4_count = len([conn for conn in connections if is_valid_ipv4_address(conn.remote_address)])
    ipv6_count = len(connections) - ipv4_count

    total_ipv4, total_ipv6 = total_ipv4 + ipv4_count, total_ipv6 + ipv6_count
    print(COLUMN % (label, ipv4_count, ipv6_count))

  print(DIV)
  print(COLUMN % ('Total', total_ipv4, total_ipv6))
  print(DIV)
  print('')

  if exit_connections:
    print(DIV)
    print(COLUMN % ('Exit Port', 'IPv4', 'IPv6'))
    print(DIV)

    total_ipv4, total_ipv6 = 0, 0

    for port in sorted(exit_connections):
      connections = exit_connections[port]
      ipv4_count = len([conn for conn in connections if is_valid_ipv4_address(conn.remote_address)])
      ipv6_count = len(connections) - ipv4_count
      total_ipv4, total_ipv6 = total_ipv4 + ipv4_count, total_ipv6 + ipv6_count

      usage = port_usage(port)
      label = '%s (%s)' % (port, usage) if usage else port

      print(COLUMN % (label, ipv4_count, ipv6_count))

    print(DIV)
    print(COLUMN % ('Total', total_ipv4, total_ipv6))
    print(DIV)
    print('')


if __name__ == '__main__':
  main()