File: multi_process_queue.py

package info (click to toggle)
plaso 20190131-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 410,832 kB
  • sloc: python: 76,636; sh: 926; makefile: 167; xml: 70; sql: 14; vhdl: 11
file content (77 lines) | stat: -rw-r--r-- 1,898 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Tests the multi-processing queue."""

from __future__ import unicode_literals

import unittest

from plaso.engine import plaso_queue
from plaso.lib import errors
from plaso.multi_processing import multi_process_queue

from tests import test_lib as shared_test_lib


class TestQueueConsumer(object):
  """Class that implements the test queue consumer.

  The queue consumer subscribes to updates on the queue.

  Attributes:
    items (list[object]): queued items.
  """

  def __init__(self, queue):
    """Initializes the queue consumer.

    Args:
      queue (Queue): queue.
    """
    super(TestQueueConsumer, self).__init__()
    self._abort = False
    self._queue = queue
    self.items = []

  @property
  def number_of_items(self):
    """The number of items."""
    return len(self.items)

  def ConsumeItems(self):
    """Consumes the items that are pushed on the queue."""
    while not self._abort:
      try:
        item = self._queue.PopItem()

      except (errors.QueueClose, errors.QueueEmpty):
        break

      if isinstance(item, plaso_queue.QueueAbort):
        break

      self.items.append(item)


class MultiProcessingQueueTest(shared_test_lib.BaseTestCase):
  """Tests the multi-processing queue object."""

  _ITEMS = frozenset(['item1', 'item2', 'item3', 'item4'])

  def testPushPopItem(self):
    """Tests the PushItem and PopItem functions."""
    # A timeout is used to prevent the multi processing queue to close and
    # stop blocking the current process
    test_queue = multi_process_queue.MultiProcessingQueue(timeout=0.1)

    for item in self._ITEMS:
      test_queue.PushItem(item)

    test_queue_consumer = TestQueueConsumer(test_queue)
    test_queue_consumer.ConsumeItems()

    self.assertEqual(test_queue_consumer.number_of_items, len(self._ITEMS))


if __name__ == '__main__':
  unittest.main()