File: analysis_process.py

package info (click to toggle)
plaso 20241006-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 673,228 kB
  • sloc: python: 91,831; sh: 557; xml: 97; makefile: 17; sql: 14; vhdl: 11
file content (135 lines) | stat: -rw-r--r-- 4,496 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the multi-processing analysis process."""

import os
import time
import unittest

from plaso.analysis import interface as analysis_interface
from plaso.engine import configurations
from plaso.multi_process import analysis_process
from plaso.multi_process import plaso_queue
from plaso.multi_process import zeromq_queue

from tests import test_lib as shared_test_lib
from tests.multi_process import test_lib


class TestAnalysisPlugin(analysis_interface.AnalysisPlugin):
  """Analysis plugin for testing."""

  NAME = 'test_plugin'

  # pylint: disable=arguments-renamed
  # pylint: disable=unused-argument
  def CompileReport(self, mediator):
    """Compiles a report of the analysis.

    After the plugin has received every copy of an event to
    analyze this function will be called so that the report
    can be assembled.

    Args:
      mediator (AnalysisMediator): mediates interactions between
          analysis plugins and other components, such as storage and dfvfs.
    """
    return

  def ExamineEvent(self, mediator, event, event_data, event_data_stream):
    """Analyzes an event.

    Args:
      mediator (AnalysisMediator): mediates interactions between analysis
          plugins and other components, such as storage and dfvfs.
      event (EventObject): event.
      event_data (EventData): event data.
      event_data_stream (EventDataStream): event data stream.
    """
    return


class AnalysisProcessTest(test_lib.MultiProcessingTestCase):
  """Tests the multi-processing analysis process."""

  # pylint: disable=protected-access

  _QUEUE_TIMEOUT = 5

  def testInitialization(self):
    """Tests the initialization."""
    with shared_test_lib.TempDirectory() as temp_directory:
      configuration = configurations.ProcessingConfiguration()
      configuration.task_storage_path = temp_directory

      test_process = analysis_process.AnalysisProcess(
          None, None, configuration, [], name='TestAnalysis')
      self.assertIsNotNone(test_process)

  def testGetStatus(self):
    """Tests the _GetStatus function."""
    with shared_test_lib.TempDirectory() as temp_directory:
      configuration = configurations.ProcessingConfiguration()
      configuration.task_storage_path = temp_directory

      test_process = analysis_process.AnalysisProcess(
          None, None, configuration, [], name='TestAnalysis')
      status_attributes = test_process._GetStatus()

      self.assertIsNotNone(status_attributes)
      self.assertEqual(status_attributes['identifier'], 'TestAnalysis')
      self.assertIsNone(status_attributes['number_of_produced_reports'])

      # TODO: add test with analysis mediator.

  def testMain(self):
    """Tests the _Main function."""
    output_event_queue = zeromq_queue.ZeroMQPushBindQueue(
        name='test output event queue', timeout_seconds=self._QUEUE_TIMEOUT)
    output_event_queue.Open()

    input_event_queue = zeromq_queue.ZeroMQPullConnectQueue(
        name='test input event queue', delay_open=True,
        port=output_event_queue.port,
        timeout_seconds=self._QUEUE_TIMEOUT)

    analysis_plugin = TestAnalysisPlugin()

    with shared_test_lib.TempDirectory() as temp_directory:
      # Set up the processed for the task storage file generated by the
      # analysis plugin.
      os.mkdir(os.path.join(temp_directory, 'processed'))

      configuration = configurations.ProcessingConfiguration()
      configuration.task_storage_path = temp_directory

      test_process = analysis_process.AnalysisProcess(
          input_event_queue, analysis_plugin, configuration, [],
          name='TestAnalysis')

      setattr(test_process, '_FOREMAN_STATUS_WAIT', 1)

      test_process.start()

      output_event_queue.PushItem(plaso_queue.QueueAbort(), block=False)
      output_event_queue.Close(abort=True)

      # Sleep for 1 second to allow the analysis process to terminate.
      # Before the temporary directory is removed.
      time.sleep(1)

  # TODO: add test for _ProcessEvent.

  def testSignalAbort(self):
    """Tests the SignalAbort function."""
    with shared_test_lib.TempDirectory() as temp_directory:
      configuration = configurations.ProcessingConfiguration()
      configuration.task_storage_path = temp_directory

      test_process = analysis_process.AnalysisProcess(
          None, None, configuration, [], name='TestAnalysis')
      test_process.SignalAbort()


if __name__ == '__main__':
  unittest.main()