File: analysis_process.py

package info (click to toggle)
plaso 20201007-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 519,924 kB
  • sloc: python: 79,002; sh: 629; xml: 72; sql: 14; vhdl: 11; makefile: 10
file content (119 lines) | stat: -rw-r--r-- 3,773 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the multi-processing analysis process."""

from __future__ import unicode_literals

import unittest

from plaso.analysis import interface as analysis_interface
from plaso.containers import sessions
from plaso.engine import configurations
from plaso.engine import plaso_queue
from plaso.engine import zeromq_queue
from plaso.multi_processing import analysis_process

from tests.multi_processing import test_lib


class TestAnalysisPlugin(analysis_interface.AnalysisPlugin):
  """Analysis plugin for testing."""

  NAME = 'test_plugin'

  # pylint: disable=unused-argument
  def CompileReport(self, mediator):
    """Compiles a report of the analysis.

    After the plugin has received every copy of an event to
    analyze this function will be called so that the report
    can be assembled.

    Args:
      mediator (AnalysisMediator): mediates interactions between
          analysis plugins and other components, such as storage and dfvfs.
    """
    return

  def ExamineEvent(self, mediator, event, event_data, event_data_stream):
    """Analyzes an event.

    Args:
      mediator (AnalysisMediator): mediates interactions between analysis
          plugins and other components, such as storage and dfvfs.
      event (EventObject): event.
      event_data (EventData): event data.
      event_data_stream (EventDataStream): event data stream.
    """
    return


class AnalysisProcessTest(test_lib.MultiProcessingTestCase):
  """Tests the multi-processing analysis process."""

  # pylint: disable=protected-access

  _QUEUE_TIMEOUT = 5

  def testInitialization(self):
    """Tests the initialization."""
    configuration = configurations.ProcessingConfiguration()

    test_process = analysis_process.AnalysisProcess(
        None, None, None, None, configuration, name='TestAnalysis')
    self.assertIsNotNone(test_process)

  def testGetStatus(self):
    """Tests the _GetStatus function."""
    configuration = configurations.ProcessingConfiguration()

    test_process = analysis_process.AnalysisProcess(
        None, None, None, None, configuration, name='TestAnalysis')
    status_attributes = test_process._GetStatus()

    self.assertIsNotNone(status_attributes)
    self.assertEqual(status_attributes['identifier'], 'TestAnalysis')
    self.assertIsNone(status_attributes['number_of_produced_reports'])

    # TODO: add test with analysis mediator.

  def testMain(self):
    """Tests the _Main function."""
    output_event_queue = zeromq_queue.ZeroMQPushBindQueue(
        name='test output event queue', timeout_seconds=self._QUEUE_TIMEOUT)
    output_event_queue.Open()

    input_event_queue = zeromq_queue.ZeroMQPullConnectQueue(
        name='test input event queue', delay_open=True,
        port=output_event_queue.port,
        timeout_seconds=self._QUEUE_TIMEOUT)

    session = sessions.Session()
    storage_writer = self._CreateStorageWriter(session)
    analysis_plugin = TestAnalysisPlugin()

    configuration = configurations.ProcessingConfiguration()

    test_process = analysis_process.AnalysisProcess(
        input_event_queue, storage_writer, None, analysis_plugin, configuration,
        name='TestAnalysis')
    test_process._FOREMAN_STATUS_WAIT = 1

    test_process.start()

    output_event_queue.PushItem(plaso_queue.QueueAbort(), block=False)
    output_event_queue.Close(abort=True)

  # TODO: add test for _ProcessEvent.

  def testSignalAbort(self):
    """Tests the SignalAbort function."""

    configuration = configurations.ProcessingConfiguration()
    test_process = analysis_process.AnalysisProcess(
        None, None, None, None, configuration, name='TestAnalysis')
    test_process.SignalAbort()


if __name__ == '__main__':
  unittest.main()