1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
# Copyright 2013-2014, Damian Johnson and The Tor Project
# See LICENSE for licensing information
"""
Parsing for `TorDNSEL <https://www.torproject.org/projects/tordnsel.html.en>`_
exit list files.
"""
import datetime
import stem.util.connection
import stem.util.str_tools
import stem.util.tor_tools
from stem.descriptor import (
Descriptor,
_read_until_keywords,
_get_descriptor_components,
)
def _parse_file(tordnsel_file, validate = True, **kwargs):
"""
Iterates over a tordnsel file.
:returns: iterator for :class:`~stem.descriptor.tordnsel.TorDNSEL`
instances in the file
:raises:
* **ValueError** if the contents is malformed and validate is **True**
* **IOError** if the file can't be read
"""
# skip content prior to the first ExitNode
_read_until_keywords('ExitNode', tordnsel_file, skip = True)
while True:
contents = _read_until_keywords('ExitAddress', tordnsel_file)
contents += _read_until_keywords('ExitNode', tordnsel_file)
if contents:
yield TorDNSEL(bytes.join(b'', contents), validate, **kwargs)
else:
break # done parsing file
class TorDNSEL(Descriptor):
"""
TorDNSEL descriptor (`exitlist specification
<https://www.torproject.org/tordnsel/exitlist-spec.txt>`_)
:var str fingerprint: **\*** authority's fingerprint
:var datetime published: **\*** time in UTC when this descriptor was made
:var datetime last_status: **\*** time in UTC when the relay was seen in a v2 network status
:var list exit_addresses: **\*** list of (str address, datetime date) tuples consisting of the found IPv4 exit address and the time
**\*** attribute is either required when we're parsed with validation or has
a default value, others are left as **None** if undefined
"""
def __init__(self, raw_contents, validate):
super(TorDNSEL, self).__init__(raw_contents)
raw_contents = stem.util.str_tools._to_unicode(raw_contents)
entries = _get_descriptor_components(raw_contents, validate)
self.fingerprint = None
self.published = None
self.last_status = None
self.exit_addresses = []
self._parse(entries, validate)
def _parse(self, entries, validate):
for keyword, values in entries.items():
value, block_content = values[0]
if validate and block_content:
raise ValueError('Unexpected block content: %s' % block_content)
if keyword == 'ExitNode':
if validate and not stem.util.tor_tools.is_valid_fingerprint(value):
raise ValueError('Tor relay fingerprints consist of forty hex digits: %s' % value)
self.fingerprint = value
elif keyword == 'Published':
try:
self.published = datetime.datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
except ValueError:
if validate:
raise ValueError("Published time wasn't parsable: %s" % value)
elif keyword == 'LastStatus':
try:
self.last_status = datetime.datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
except ValueError:
if validate:
raise ValueError("LastStatus time wasn't parsable: %s" % value)
elif keyword == 'ExitAddress':
for value, block_content in values:
address, date = value.split(' ', 1)
if validate:
if not stem.util.connection.is_valid_ipv4_address(address):
raise ValueError("ExitAddress isn't a valid IPv4 address: %s" % address)
elif block_content:
raise ValueError('Unexpected block content: %s' % block_content)
try:
date = datetime.datetime.strptime(date, '%Y-%m-%d %H:%M:%S')
self.exit_addresses.append((address, date))
except ValueError:
if validate:
raise ValueError("ExitAddress found time wasn't parsable: %s" % value)
elif validate:
raise ValueError('Unrecognized keyword: %s' % keyword)
|