File: reader.py

package info (click to toggle)
python-stem 1.2.2-1.1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 4,568 kB
  • ctags: 2,036
  • sloc: python: 20,108; makefile: 127; sh: 3
file content (568 lines) | stat: -rw-r--r-- 18,816 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
# Copyright 2012-2014, Damian Johnson and The Tor Project
# See LICENSE for licensing information

"""
Utilities for reading descriptors from local directories and archives. This is
mostly done through the :class:`~stem.descriptor.reader.DescriptorReader`
class, which is an iterator for the descriptor data in a series of
destinations. For example...

::

  my_descriptors = [
    '/tmp/server-descriptors-2012-03.tar.bz2',
    '/tmp/archived_descriptors/',
  ]

  # prints the contents of all the descriptor files
  with DescriptorReader(my_descriptors) as reader:
    for descriptor in reader:
      print descriptor

This ignores files that cannot be processed due to read errors or unparsable
content. To be notified of skipped files you can register a listener with
:func:`~stem.descriptor.reader.DescriptorReader.register_skip_listener`.

The :class:`~stem.descriptor.reader.DescriptorReader` keeps track of the last
modified timestamps for descriptor files that it has read so it can skip
unchanged files if run again. This listing of processed files can also be
persisted and applied to other
:class:`~stem.descriptor.reader.DescriptorReader` instances. For example, the
following prints descriptors as they're changed over the course of a minute,
and picks up where it left off if run again...

::

  reader = DescriptorReader(['/tmp/descriptor_data'])

  try:
    processed_files = load_processed_files('/tmp/used_descriptors')
    reader.set_processed_files(processed_files)
  except: pass # could not load, maybe this is the first run

  start_time = time.time()

  while (time.time() - start_time) < 60:
    # prints any descriptors that have changed since last checked
    with reader:
      for descriptor in reader:
        print descriptor

    time.sleep(1)

  save_processed_files('/tmp/used_descriptors', reader.get_processed_files())

**Module Overview:**

::

  load_processed_files - Loads a listing of processed files
  save_processed_files - Saves a listing of processed files

  DescriptorReader - Iterator for descriptor data on the local file system
    |- get_processed_files - provides the listing of files that we've processed
    |- set_processed_files - sets our tracking of the files we have processed
    |- register_read_listener - adds a listener for when files are read
    |- register_skip_listener - adds a listener that's notified of skipped files
    |- start - begins reading descriptor data
    |- stop - stops reading descriptor data
    |- __enter__ / __exit__ - manages the descriptor reader thread in the context
    +- __iter__ - iterates over descriptor data in unread files

  FileSkipped - Base exception for a file that was skipped
    |- AlreadyRead - We've already read a file with this last modified timestamp
    |- ParsingFailure - Contents can't be parsed as descriptor data
    |- UnrecognizedType - File extension indicates non-descriptor data
    +- ReadFailed - Wraps an error that was raised while reading the file
       +- FileMissing - File does not exist
"""

import mimetypes
import os
import Queue
import tarfile
import threading

import stem.descriptor
import stem.prereq
import stem.util.system

# flag to indicate when the reader thread is out of descriptor files to read
FINISHED = 'DONE'


class FileSkipped(Exception):
  "Base error when we can't provide descriptor data from a file."


class AlreadyRead(FileSkipped):
  """
  Already read a file with this 'last modified' timestamp or later.

  :param int last_modified: unix timestamp for when the file was last modified
  :param int last_modified_when_read: unix timestamp for the modification time
    when we last read this file
  """

  def __init__(self, last_modified, last_modified_when_read):
    super(AlreadyRead, self).__init__('File has already been read since it was last modified. modification time: %s, last read: %s' % (last_modified, last_modified_when_read))
    self.last_modified = last_modified
    self.last_modified_when_read = last_modified_when_read


class ParsingFailure(FileSkipped):
  """
  File contents could not be parsed as descriptor data.

  :param ValueError exception: issue that arose when parsing
  """

  def __init__(self, parsing_exception):
    super(ParsingFailure, self).__init__(parsing_exception)
    self.exception = parsing_exception


class UnrecognizedType(FileSkipped):
  """
  File doesn't contain descriptor data. This could either be due to its file
  type or because it doesn't conform to a recognizable descriptor type.

  :param tuple mime_type: the (type, encoding) tuple provided by mimetypes.guess_type()
  """

  def __init__(self, mime_type):
    super(UnrecognizedType, self).__init__('Unrecognized mime type: %s (%s)' % mime_type)
    self.mime_type = mime_type


class ReadFailed(FileSkipped):
  """
  An IOError occurred while trying to read the file.

  :param IOError exception: issue that arose when reading the file, **None** if
    this arose due to the file not being present
  """

  def __init__(self, read_exception):
    super(ReadFailed, self).__init__(read_exception)
    self.exception = read_exception


class FileMissing(ReadFailed):
  'File does not exist.'

  def __init__(self):
    super(FileMissing, self).__init__('File does not exist')


def load_processed_files(path):
  """
  Loads a dictionary of 'path => last modified timestamp' mappings, as
  persisted by :func:`~stem.descriptor.reader.save_processed_files`, from a
  file.

  :param str path: location to load the processed files dictionary from

  :returns: **dict** of 'path (**str**) => last modified unix timestamp
    (**int**)' mappings

  :raises:
    * **IOError** if unable to read the file
    * **TypeError** if unable to parse the file's contents
  """

  processed_files = {}

  with open(path) as input_file:
    for line in input_file.readlines():
      line = line.strip()

      if not line:
        continue  # skip blank lines

      if not ' ' in line:
        raise TypeError('Malformed line: %s' % line)

      path, timestamp = line.rsplit(' ', 1)

      if not os.path.isabs(path):
        raise TypeError("'%s' is not an absolute path" % path)
      elif not timestamp.isdigit():
        raise TypeError("'%s' is not an integer timestamp" % timestamp)

      processed_files[path] = int(timestamp)

  return processed_files


def save_processed_files(path, processed_files):
  """
  Persists a dictionary of 'path => last modified timestamp' mappings (as
  provided by the DescriptorReader's
  :func:`~stem.descriptor.reader.DescriptorReader.get_processed_files` method)
  so that they can be loaded later and applied to another
  :class:`~stem.descriptor.reader.DescriptorReader`.

  :param str path: location to save the processed files dictionary to
  :param dict processed_files: 'path => last modified' mappings

  :raises:
    * **IOError** if unable to write to the file
    * **TypeError** if processed_files is of the wrong type
  """

  # makes the parent directory if it doesn't already exist
  try:
    path_dir = os.path.dirname(path)

    if not os.path.exists(path_dir):
      os.makedirs(path_dir)
  except OSError as exc:
    raise IOError(exc)

  with open(path, 'w') as output_file:
    for path, timestamp in processed_files.items():
      if not os.path.isabs(path):
        raise TypeError('Only absolute paths are acceptable: %s' % path)

      output_file.write('%s %i\n' % (path, timestamp))


class DescriptorReader(object):
  """
  Iterator for the descriptor data on the local file system. This can process
  text files, tarball archives (gzip or bzip2), or recurse directories.

  By default this limits the number of descriptors that we'll read ahead before
  waiting for our caller to fetch some of them. This is included to avoid
  unbounded memory usage.

  Our persistence_path argument is a convenient method to persist the listing
  of files we have processed between runs, however it doesn't allow for error
  handling. If you want that then use the
  :func:`~stem.descriptor.reader.load_processed_files` and
  :func:`~stem.descriptor.reader.save_processed_files` functions instead.

  :param str,list target: path or list of paths for files or directories to be read from
  :param bool validate: checks the validity of the descriptor's content if
    **True**, skips these checks otherwise
  :param bool follow_links: determines if we'll follow symlinks when traversing
    directories (requires python 2.6)
  :param int buffer_size: descriptors we'll buffer before waiting for some to
    be read, this is unbounded if zero
  :param str persistence_path: if set we will load and save processed file
    listings from this path, errors are ignored
  :param stem.descriptor.__init__.DocumentHandler document_handler: method in
    which to parse :class:`~stem.descriptor.networkstatus.NetworkStatusDocument`
  :param dict kwargs: additional arguments for the descriptor constructor
  """

  def __init__(self, target, validate = True, follow_links = False, buffer_size = 100, persistence_path = None, document_handler = stem.descriptor.DocumentHandler.ENTRIES, **kwargs):
    if isinstance(target, (bytes, unicode)):
      self._targets = [target]
    else:
      self._targets = target

    # expand any relative paths we got

    target = map(os.path.abspath, target)

    self._validate = validate
    self._follow_links = follow_links
    self._persistence_path = persistence_path
    self._document_handler = document_handler
    self._kwargs = kwargs
    self._read_listeners = []
    self._skip_listeners = []
    self._processed_files = {}

    self._reader_thread = None
    self._reader_thread_lock = threading.RLock()

    self._iter_lock = threading.RLock()
    self._iter_notice = threading.Event()

    self._is_stopped = threading.Event()
    self._is_stopped.set()

    # Descriptors that we have read but not yet provided to the caller. A
    # FINISHED entry is used by the reading thread to indicate the end.

    self._unreturned_descriptors = Queue.Queue(buffer_size)

    if self._persistence_path:
      try:
        processed_files = load_processed_files(self._persistence_path)
        self.set_processed_files(processed_files)
      except:
        pass

  def get_processed_files(self):
    """
    For each file that we have read descriptor data from this provides a
    mapping of the form...

    ::

      absolute path (str) => last modified unix timestamp (int)

    This includes entries set through the
    :func:`~stem.descriptor.reader.DescriptorReader.set_processed_files`
    method. Each run resets this to only the files that were present during
    that run.

    :returns: **dict** with the absolute paths and unix timestamp for the last
      modified times of the files we have processed
    """

    # make sure that we only provide back absolute paths
    return dict((os.path.abspath(k), v) for (k, v) in self._processed_files.items())

  def set_processed_files(self, processed_files):
    """
    Sets the listing of the files we have processed. Most often this is used
    with a newly created :class:`~stem.descriptor.reader.DescriptorReader` to
    pre-populate the listing of descriptor files that we have seen.

    :param dict processed_files: mapping of absolute paths (**str**) to unix
      timestamps for the last modified time (**int**)
    """

    self._processed_files = dict(processed_files)

  def register_read_listener(self, listener):
    """
    Registers a listener for when files are read. This is executed prior to
    processing files. Listeners are expected to be of the form...

    ::

      my_listener(path)

    :param functor listener: functor to be notified when files are read
    """

    self._read_listeners.append(listener)

  def register_skip_listener(self, listener):
    """
    Registers a listener for files that are skipped. This listener is expected
    to be a functor of the form...

    ::

      my_listener(path, exception)

    :param functor listener: functor to be notified of files that are skipped
      to read errors or because they couldn't be parsed as valid descriptor data
    """

    self._skip_listeners.append(listener)

  def get_buffered_descriptor_count(self):
    """
    Provides the number of descriptors that are waiting to be iterated over.
    This is limited to the buffer_size that we were constructed with.

    :returns: **int** for the estimated number of currently enqueued
      descriptors, this is not entirely reliable
    """

    return self._unreturned_descriptors.qsize()

  def start(self):
    """
    Starts reading our descriptor files.

    :raises: **ValueError** if we're already reading the descriptor files
    """

    with self._reader_thread_lock:
      if self._reader_thread:
        raise ValueError('Already running, you need to call stop() first')
      else:
        self._is_stopped.clear()
        self._reader_thread = threading.Thread(target = self._read_descriptor_files, name='Descriptor Reader')
        self._reader_thread.setDaemon(True)
        self._reader_thread.start()

  def stop(self):
    """
    Stops further reading of descriptor files.
    """

    with self._reader_thread_lock:
      self._is_stopped.set()
      self._iter_notice.set()

      # clears our queue to unblock enqueue calls

      try:
        while True:
          self._unreturned_descriptors.get_nowait()
      except Queue.Empty:
        pass

      self._reader_thread.join()
      self._reader_thread = None

      if self._persistence_path:
        try:
          processed_files = self.get_processed_files()
          save_processed_files(self._persistence_path, processed_files)
        except:
          pass

  def _read_descriptor_files(self):
    new_processed_files = {}
    remaining_files = list(self._targets)

    while remaining_files and not self._is_stopped.is_set():
      target = remaining_files.pop(0)

      if not os.path.exists(target):
        self._notify_skip_listeners(target, FileMissing())
        continue

      if os.path.isdir(target):
        walker = os.walk(target, followlinks = self._follow_links)
        self._handle_walker(walker, new_processed_files)
      else:
        self._handle_file(target, new_processed_files)

    self._processed_files = new_processed_files

    if not self._is_stopped.is_set():
      self._unreturned_descriptors.put(FINISHED)

    self._iter_notice.set()

  def __iter__(self):
    with self._iter_lock:
      while not self._is_stopped.is_set():
        try:
          descriptor = self._unreturned_descriptors.get_nowait()

          if descriptor == FINISHED:
            break
          else:
            yield descriptor
        except Queue.Empty:
          self._iter_notice.wait()
          self._iter_notice.clear()

  def _handle_walker(self, walker, new_processed_files):
    for root, _, files in walker:
      for filename in files:
        self._handle_file(os.path.join(root, filename), new_processed_files)

        # this can take a while if, say, we're including the root directory
        if self._is_stopped.is_set():
          return

  def _handle_file(self, target, new_processed_files):
    # This is a file. Register its last modified timestamp and check if
    # it's a file that we should skip.

    try:
      last_modified = int(os.stat(target).st_mtime)
      last_used = self._processed_files.get(target)
      new_processed_files[target] = last_modified
    except OSError as exc:
      self._notify_skip_listeners(target, ReadFailed(exc))
      return

    if last_used and last_used >= last_modified:
      self._notify_skip_listeners(target, AlreadyRead(last_modified, last_used))
      return

    # Block devices and such are never descriptors, and can cause us to block
    # for quite a while so skipping anything that isn't a regular file.

    if not os.path.isfile(target):
      return

    # The mimetypes module only checks the file extension. To actually
    # check the content (like the 'file' command) we'd need something like
    # pymagic (https://github.com/cloudburst/pymagic).

    target_type = mimetypes.guess_type(target)

    if target_type[0] in (None, 'text/plain'):
      # either '.txt' or an unknown type
      self._handle_descriptor_file(target, target_type)
    elif stem.util.system.is_tarfile(target):
      # handles gzip, bz2, and decompressed tarballs among others
      self._handle_archive(target)
    else:
      self._notify_skip_listeners(target, UnrecognizedType(target_type))

  def _handle_descriptor_file(self, target, mime_type):
    try:
      self._notify_read_listeners(target)

      with open(target, 'rb') as target_file:
        for desc in stem.descriptor.parse_file(target_file, validate = self._validate, document_handler = self._document_handler, **self._kwargs):
          if self._is_stopped.is_set():
            return

          self._unreturned_descriptors.put(desc)
          self._iter_notice.set()
    except TypeError as exc:
      self._notify_skip_listeners(target, UnrecognizedType(mime_type))
    except ValueError as exc:
      self._notify_skip_listeners(target, ParsingFailure(exc))
    except IOError as exc:
      self._notify_skip_listeners(target, ReadFailed(exc))

  def _handle_archive(self, target):
    # TODO: When dropping python 2.6 support go back to using 'with' for
    # tarfiles...
    #
    #   http://bugs.python.org/issue7232

    tar_file = None

    try:
      self._notify_read_listeners(target)
      tar_file = tarfile.open(target)

      for tar_entry in tar_file:
        if tar_entry.isfile():
          entry = tar_file.extractfile(tar_entry)

          try:
            for desc in stem.descriptor.parse_file(entry, validate = self._validate, document_handler = self._document_handler, **self._kwargs):
              if self._is_stopped.is_set():
                return

              desc._set_path(os.path.abspath(target))
              desc._set_archive_path(entry.name)
              self._unreturned_descriptors.put(desc)
              self._iter_notice.set()
          except TypeError as exc:
            self._notify_skip_listeners(target, ParsingFailure(exc))
          except ValueError as exc:
            self._notify_skip_listeners(target, ParsingFailure(exc))
          finally:
            entry.close()
    except IOError as exc:
      self._notify_skip_listeners(target, ReadFailed(exc))
    finally:
      if tar_file:
        tar_file.close()

  def _notify_read_listeners(self, path):
    for listener in self._read_listeners:
      listener(path)

  def _notify_skip_listeners(self, path, exception):
    for listener in self._skip_listeners:
      listener(path, exception)

  def __enter__(self):
    self.start()
    return self

  def __exit__(self, exit_type, value, traceback):
    self.stop()