File: extraction_engine.py

package info (click to toggle)
plaso 20241006-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 673,228 kB
  • sloc: python: 91,831; sh: 557; xml: 97; makefile: 17; sql: 14; vhdl: 11
file content (97 lines) | stat: -rw-r--r-- 3,569 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests the multi-process processing engine."""

import collections
import os
import unittest

from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.path import factory as path_spec_factory

from plaso.containers import sessions
from plaso.lib import definitions
from plaso.engine import configurations
from plaso.multi_process import extraction_engine
from plaso.storage.sqlite import writer as sqlite_writer

from tests import test_lib as shared_test_lib


class ExtractionMultiProcessEngineTest(shared_test_lib.BaseTestCase):
  """Tests for the task-based multi-process extraction engine."""

  def testProcessSource(self):
    """Tests the PreprocessSource and ProcessSource functions."""
    test_artifacts_path = shared_test_lib.GetTestFilePath(['artifacts'])
    self._SkipIfPathNotExists(test_artifacts_path)

    test_engine = extraction_engine.ExtractionMultiProcessEngine(
        maximum_number_of_tasks=100)
    test_engine.BuildArtifactsRegistry(test_artifacts_path, None)

    test_file_path = self._GetTestFilePath(['ímynd.dd'])
    self._SkipIfPathNotExists(test_file_path)

    os_path_spec = path_spec_factory.Factory.NewPathSpec(
        dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
    source_path_spec = path_spec_factory.Factory.NewPathSpec(
        dfvfs_definitions.TYPE_INDICATOR_TSK, location='/',
        parent=os_path_spec)

    session = sessions.Session()

    processing_configuration = configurations.ProcessingConfiguration()
    processing_configuration.data_location = shared_test_lib.DATA_PATH
    processing_configuration.parser_filter_expression = 'filestat'
    processing_configuration.task_storage_format = (
        definitions.STORAGE_FORMAT_SQLITE)

    with shared_test_lib.TempDirectory() as temp_directory:
      temp_file = os.path.join(temp_directory, 'storage.plaso')
      storage_writer = sqlite_writer.SQLiteStorageWriter()
      storage_writer.Open(path=temp_file)

      try:
        system_configurations = test_engine.PreprocessSource(
            [source_path_spec], storage_writer)

        # The method is named ProcessSourceMulti because pylint 2.6.0 and
        # later gets confused about keyword arguments when ProcessSource
        # is used.
        processing_status = test_engine.ProcessSourceMulti(
            storage_writer, session.identifier, processing_configuration,
            system_configurations, [source_path_spec],
            storage_file_path=temp_directory)

        number_of_events = storage_writer.GetNumberOfAttributeContainers(
            'event')
        number_of_extraction_warnings = (
            storage_writer.GetNumberOfAttributeContainers(
                'extraction_warning'))
        number_of_recovery_warnings = (
            storage_writer.GetNumberOfAttributeContainers(
                'recovery_warning'))

        parsers_counter = collections.Counter({
            parser_count.name: parser_count.number_of_events
            for parser_count in storage_writer.GetAttributeContainers(
                'parser_count')})

      finally:
        storage_writer.Close()

    self.assertFalse(processing_status.aborted)

    self.assertEqual(number_of_events, 15)
    self.assertEqual(number_of_extraction_warnings, 0)
    self.assertEqual(number_of_recovery_warnings, 0)

    expected_parsers_counter = collections.Counter({
        'filestat': 15,
        'total': 15})
    self.assertEqual(parsers_counter, expected_parsers_counter)


if __name__ == '__main__':
  unittest.main()