1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
|
"""
Integration tests for stem.descriptor.server_descriptor.
"""
import datetime
import os
import tarfile
import unittest
import stem.control
import stem.descriptor
import stem.descriptor.server_descriptor
import stem.exit_policy
import stem.version
import test.runner
from test.integ.descriptor import get_resource
TARFILE_PATH = os.path.join(os.path.dirname(__file__), 'data', 'descriptor_archive.tar')
TARFILE_FINGERPRINTS = set([
u'B6D83EC2D9E18B0A7A33428F8CFA9C536769E209',
u'E0BD57A11F00041A9789577C53A1B784473669E4',
u'1F43EE37A0670301AD9CB555D94AFEC2C89FDE86',
])
class TestServerDescriptor(unittest.TestCase):
def test_with_tarfile_path(self):
"""
Fetch server descriptors via parse_file() for a tarfile path.
"""
descriptors = list(stem.descriptor.parse_file(TARFILE_PATH))
self.assertEqual(3, len(descriptors))
fingerprints = set([desc.fingerprint for desc in descriptors])
self.assertEqual(TARFILE_FINGERPRINTS, fingerprints)
def test_with_tarfile_object(self):
"""
Fetch server descriptors via parse_file() for a tarfile object.
"""
# TODO: When dropping python 2.6 support we can go back to using the 'with'
# keyword here.
tar_file = tarfile.open(TARFILE_PATH)
descriptors = list(stem.descriptor.parse_file(tar_file))
self.assertEqual(3, len(descriptors))
fingerprints = set([desc.fingerprint for desc in descriptors])
self.assertEqual(TARFILE_FINGERPRINTS, fingerprints)
tar_file.close()
def test_metrics_descriptor(self):
"""
Parses and checks our results against a server descriptor from metrics.
"""
descriptor_file = open(get_resource('example_descriptor'), 'rb')
expected_family = set([
'$0CE3CFB1E9CC47B63EA8869813BF6FAB7D4540C1',
'$1FD187E8F69A9B74C9202DC16A25B9E7744AB9F6',
'$74FB5EFA6A46DE4060431D515DC9A790E6AD9A7C',
'$77001D8DA9BF445B0F81AA427A675F570D222E6A',
'$B6D83EC2D9E18B0A7A33428F8CFA9C536769E209',
'$D2F37F46182C23AB747787FD657E680B34EAF892',
'$E0BD57A11F00041A9789577C53A1B784473669E4',
'$E5E3E9A472EAF7BE9682B86E92305DB4C71048EF',
])
expected_onion_key = """-----BEGIN RSA PUBLIC KEY-----
MIGJAoGBAJv5IIWQ+WDWYUdyA/0L8qbIkEVH/cwryZWoIaPAzINfrw1WfNZGtBmg
skFtXhOHHqTRN4GPPrZsAIUOQGzQtGb66IQgT4tO/pj+P6QmSCCdTfhvGfgTCsC+
WPi4Fl2qryzTb3QO5r5x7T8OsG2IBUET1bLQzmtbC560SYR49IvVAgMBAAE=
-----END RSA PUBLIC KEY-----"""
expected_signing_key = """-----BEGIN RSA PUBLIC KEY-----
MIGJAoGBAKwvOXyztVKnuYvpTKt+nS3XIKeO8dVungi8qGoeS+6gkR6lDtGfBTjd
uE9UIkdAl9zi8/1Ic2wsUNHE9jiS0VgeupITGZY8YOyMJJ/xtV1cqgiWhq1dUYaq
51TOtUogtAPgXPh4J+V8HbFFIcCzIh3qCO/xXo+DSHhv7SSif1VpAgMBAAE=
-----END RSA PUBLIC KEY-----"""
expected_signature = """-----BEGIN SIGNATURE-----
dskLSPz8beUW7bzwDjR6EVNGpyoZde83Ejvau+5F2c6cGnlu91fiZN3suE88iE6e
758b9ldq5eh5mapb8vuuV3uO+0Xsud7IEOqfxdkmk0GKnUX8ouru7DSIUzUL0zqq
Qlx9HNCqCY877ztFRC624ja2ql6A2hBcuoYMbkHjcQ4=
-----END SIGNATURE-----"""
desc = next(stem.descriptor.parse_file(descriptor_file, 'server-descriptor 1.0'))
self.assertEquals('caerSidi', desc.nickname)
self.assertEquals('A7569A83B5706AB1B1A9CB52EFF7D2D32E4553EB', desc.fingerprint)
self.assertEquals('71.35.133.197', desc.address)
self.assertEquals(9001, desc.or_port)
self.assertEquals(None, desc.socks_port)
self.assertEquals(None, desc.dir_port)
self.assertEquals(b'Tor 0.2.1.30 on Linux x86_64', desc.platform)
self.assertEquals(stem.version.Version('0.2.1.30'), desc.tor_version)
self.assertEquals('Linux x86_64', desc.operating_system)
self.assertEquals(588217, desc.uptime)
self.assertEquals(datetime.datetime(2012, 3, 1, 17, 15, 27), desc.published)
self.assertEquals(b'www.atagar.com/contact', desc.contact)
self.assertEquals(['1', '2'], desc.link_protocols)
self.assertEquals(['1'], desc.circuit_protocols)
self.assertEquals(False, desc.hibernating)
self.assertEquals(False, desc.allow_single_hop_exits)
self.assertEquals(False, desc.extra_info_cache)
self.assertEquals('D225B728768D7EA4B5587C13A7A9D22EBBEE6E66', desc.extra_info_digest)
self.assertEquals(['2'], desc.hidden_service_dir)
self.assertEquals(expected_family, desc.family)
self.assertEquals(153600, desc.average_bandwidth)
self.assertEquals(256000, desc.burst_bandwidth)
self.assertEquals(104590, desc.observed_bandwidth)
self.assertEquals(stem.exit_policy.ExitPolicy('reject *:*'), desc.exit_policy)
self.assertEquals(expected_onion_key, desc.onion_key)
self.assertEquals(expected_signing_key, desc.signing_key)
self.assertEquals(expected_signature, desc.signature)
self.assertEquals([], desc.get_unrecognized_lines())
self.assertEquals('2C7B27BEAB04B4E2459D89CA6D5CD1CC5F95A689', desc.digest())
def test_metrics_descriptor_multiple(self):
"""
Parses and checks our results against a server descriptor from metrics.
"""
with open(get_resource('metrics_server_desc_multiple'), 'rb') as descriptor_file:
descriptors = list(stem.descriptor.parse_file(descriptor_file, 'server-descriptor 1.0'))
self.assertEquals(2, len(descriptors))
self.assertEquals('anonion', descriptors[0].nickname)
self.assertEquals('9A5EC5BB866517E53962AF4D3E776536694B069E', descriptors[0].fingerprint)
self.assertEquals('Unnamed', descriptors[1].nickname)
self.assertEquals('5366F1D198759F8894EA6E5FF768C667F59AFD24', descriptors[1].fingerprint)
def test_old_descriptor(self):
"""
Parses a relay server descriptor from 2005.
"""
descriptor_file = open(get_resource('old_descriptor'), 'rb')
desc = next(stem.descriptor.parse_file(descriptor_file, 'server-descriptor 1.0'))
self.assertEquals('krypton', desc.nickname)
self.assertEquals('3E2F63E2356F52318B536A12B6445373808A5D6C', desc.fingerprint)
self.assertEquals('212.37.39.59', desc.address)
self.assertEquals(8000, desc.or_port)
self.assertEquals(None, desc.socks_port)
self.assertEquals(None, desc.dir_port)
self.assertEquals(b'Tor 0.1.0.14 on FreeBSD i386', desc.platform)
self.assertEquals(stem.version.Version('0.1.0.14'), desc.tor_version)
self.assertEquals('FreeBSD i386', desc.operating_system)
self.assertEquals(64820, desc.uptime)
self.assertEquals(datetime.datetime(2005, 12, 16, 18, 1, 3), desc.published)
self.assertEquals(None, desc.contact)
self.assertEquals(None, desc.link_protocols)
self.assertEquals(None, desc.circuit_protocols)
self.assertEquals(True, desc.hibernating)
self.assertEquals(False, desc.allow_single_hop_exits)
self.assertEquals(False, desc.extra_info_cache)
self.assertEquals(None, desc.extra_info_digest)
self.assertEquals(None, desc.hidden_service_dir)
self.assertEquals(set(), desc.family)
self.assertEquals(102400, desc.average_bandwidth)
self.assertEquals(10485760, desc.burst_bandwidth)
self.assertEquals(0, desc.observed_bandwidth)
self.assertEquals(datetime.datetime(2005, 12, 16, 18, 0, 48), desc.read_history_end)
self.assertEquals(900, desc.read_history_interval)
self.assertEquals(datetime.datetime(2005, 12, 16, 18, 0, 48), desc.write_history_end)
self.assertEquals(900, desc.write_history_interval)
self.assertEquals([], desc.get_unrecognized_lines())
# The read-history and write-history lines are pretty long so just checking
# the initial contents for the line and parsed values.
read_values_start = [20774, 489973, 510022, 511163, 20949]
self.assertEquals(read_values_start, desc.read_history_values[:5])
write_values_start = [81, 8848, 8927, 8927, 83, 8848, 8931, 8929, 81, 8846]
self.assertEquals(write_values_start, desc.write_history_values[:10])
def test_cached_descriptor(self):
"""
Parses the cached descriptor file in our data directory, checking that it
doesn't raise any validation issues and looking for unrecognized descriptor
additions.
"""
# lengthy test and uneffected by targets, so only run once
if test.runner.only_run_once(self, 'test_cached_descriptor'):
return
descriptor_path = test.runner.get_runner().get_test_dir('cached-descriptors')
if not os.path.exists(descriptor_path):
test.runner.skip(self, '(no cached descriptors)')
return
with open(descriptor_path, 'rb') as descriptor_file:
for desc in stem.descriptor.parse_file(descriptor_file, 'server-descriptor 1.0'):
# the following attributes should be deprecated, and not appear in the wild
self.assertEquals(None, desc.read_history_end)
self.assertEquals(None, desc.write_history_end)
self.assertEquals(None, desc.eventdns)
self.assertEquals(None, desc.socks_port)
unrecognized_lines = desc.get_unrecognized_lines()
if unrecognized_lines:
# TODO: This isn't actually a problem, and rather than failing we
# should alert the user about these entries at the end of the tests
# (along with new events, getinfo options, and such). For now though
# there doesn't seem to be anything in practice to trigger this so
# failing to get our attention if it does.
self.fail('Unrecognized descriptor content: %s' % unrecognized_lines)
def test_non_ascii_descriptor(self):
"""
Parses a descriptor with non-ascii content.
"""
descriptor_file = open(get_resource('non-ascii_descriptor'), 'rb')
expected_contact = b'1024D/04D2E818 L\xc3\xa9na\xc3\xafc Huard <lenaic dot huard AT laposte dot net>'
desc = next(stem.descriptor.parse_file(descriptor_file, 'server-descriptor 1.0'))
self.assertEquals('Coruscant', desc.nickname)
self.assertEquals('0B9821545C48E496AEED9ECC0DB506C49FF8158D', desc.fingerprint)
self.assertEquals('88.182.161.122', desc.address)
self.assertEquals(9001, desc.or_port)
self.assertEquals(None, desc.socks_port)
self.assertEquals(9030, desc.dir_port)
self.assertEquals(b'Tor 0.2.3.25 on Linux', desc.platform)
self.assertEquals(stem.version.Version('0.2.3.25'), desc.tor_version)
self.assertEquals('Linux', desc.operating_system)
self.assertEquals(259738, desc.uptime)
self.assertEquals(datetime.datetime(2013, 5, 18, 11, 16, 19), desc.published)
self.assertEquals(expected_contact, desc.contact)
self.assertEquals(['1', '2'], desc.link_protocols)
self.assertEquals(['1'], desc.circuit_protocols)
self.assertEquals(False, desc.hibernating)
self.assertEquals(False, desc.allow_single_hop_exits)
self.assertEquals(False, desc.extra_info_cache)
self.assertEquals('56403D838DE152421CD401B8E57DAD4483A3D56B', desc.extra_info_digest)
self.assertEquals(['2'], desc.hidden_service_dir)
self.assertEquals(set(), desc.family)
self.assertEquals(102400, desc.average_bandwidth)
self.assertEquals(204800, desc.burst_bandwidth)
self.assertEquals(122818, desc.observed_bandwidth)
self.assertEquals(stem.exit_policy.ExitPolicy('reject *:*'), desc.exit_policy)
self.assertEquals([], desc.get_unrecognized_lines())
# Make sure that we can get a string representation for this descriptor
# (having non-unicode content risks a UnicodeEncodeError)...
#
# https://trac.torproject.org/8265
self.assertTrue(isinstance(str(desc), str))
def test_cr_in_contact_line(self):
"""
Parses a descriptor with a huge contact line containing anomalous carriage
returns ('\r' entries).
"""
descriptor_file = open(get_resource('cr_in_contact_line'), 'rb')
desc = next(stem.descriptor.parse_file(descriptor_file, 'server-descriptor 1.0'))
self.assertEquals('pogonip', desc.nickname)
self.assertEquals('6DABD62BC65D4E6FE620293157FC76968DAB9C9B', desc.fingerprint)
self.assertEquals('75.5.248.48', desc.address)
# the contact info block is huge so just checking the start and end,
# including some of the embedded carriage returns
contact_start = b'jie1 at pacbell dot net -----BEGIN PGP PUBLIC KEY BLOCK-----\rVersion:'
contact_end = b'YFRk3NhCY=\r=Xaw3\r-----END PGP PUBLIC KEY BLOCK-----'
self.assertTrue(desc.contact.startswith(contact_start))
self.assertTrue(desc.contact.endswith(contact_end))
def test_negative_uptime(self):
"""
Parses a descriptor where we are tolerant of a negative uptime, and another
where we shouldn't be.
"""
descriptor_file = open(get_resource('negative_uptime'), 'rb')
desc = next(stem.descriptor.parse_file(descriptor_file, 'server-descriptor 1.0'))
self.assertEquals('TipTor', desc.nickname)
self.assertEquals('137962D4931DBF08A24E843288B8A155D6D2AEDD', desc.fingerprint)
self.assertEquals('62.99.247.83', desc.address)
# modify the relay version so it's after when the negative uptime bug
# should appear
descriptor_contents = str(desc).replace('Tor 0.1.1.25', 'Tor 0.1.2.7')
self.assertRaises(ValueError, stem.descriptor.server_descriptor.RelayDescriptor, descriptor_contents)
def test_bridge_descriptor(self):
"""
Parses a bridge descriptor.
"""
descriptor_file = open(get_resource('bridge_descriptor'), 'rb')
expected_family = set([
'$CE396C72A3D0880F74C064FEA79D68C15BD380B9',
'$AB8B00C00B1347BA80A88E548FAC9EDF701D7D0E',
'$8C8A470D7C23151665A7B84E75E89FCC205A3304',
])
desc = next(stem.descriptor.parse_file(descriptor_file, 'bridge-server-descriptor 1.0'))
self.assertEquals('Unnamed', desc.nickname)
self.assertEquals('AE54E28ED069CDF45F3009F963EE3B3D6FA26A2E', desc.fingerprint)
self.assertEquals('10.45.227.253', desc.address)
self.assertEquals(9001, desc.or_port)
self.assertEquals(None, desc.socks_port)
self.assertEquals(None, desc.dir_port)
self.assertEquals(b'Tor 0.2.3.12-alpha (git-800942b4176ca31c) on Linux x86_64', desc.platform)
self.assertEquals(stem.version.Version('0.2.3.12-alpha'), desc.tor_version)
self.assertEquals('Linux x86_64', desc.operating_system)
self.assertEquals(186, desc.uptime)
self.assertEquals(datetime.datetime(2012, 3, 22, 17, 34, 38), desc.published)
self.assertEquals(b'somebody', desc.contact)
self.assertEquals(['1', '2'], desc.link_protocols)
self.assertEquals(['1'], desc.circuit_protocols)
self.assertEquals(False, desc.hibernating)
self.assertEquals(False, desc.allow_single_hop_exits)
self.assertEquals(False, desc.extra_info_cache)
self.assertEquals('134F81F7A0D270B85FCD481DD10CEA34BA7B15C9', desc.extra_info_digest)
self.assertEquals(['2'], desc.hidden_service_dir)
self.assertEquals(expected_family, desc.family)
self.assertEquals(409600, desc.average_bandwidth)
self.assertEquals(819200, desc.burst_bandwidth)
self.assertEquals(5120, desc.observed_bandwidth)
self.assertEquals(stem.exit_policy.ExitPolicy('reject *:*'), desc.exit_policy)
self.assertEquals('006FD96BA35E7785A6A3B8B75FE2E2435A13BDB4', desc.digest())
self.assertEquals([], desc.get_unrecognized_lines())
|