File: test_lib.py

package info (click to toggle)
plaso 20241006-3
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 673,224 kB
  • sloc: python: 91,831; sh: 557; xml: 97; makefile: 17; sql: 14; vhdl: 11
file content (60 lines) | stat: -rw-r--r-- 2,095 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# -*- coding: utf-8 -*-
"""Preprocessing related functions and classes for testing."""

from artifacts import reader as artifacts_reader
from artifacts import registry as artifacts_registry
from dfvfs.helpers import file_system_searcher

from plaso.preprocessors import mediator
from plaso.storage.fake import writer as fake_writer

from tests import test_lib as shared_test_lib


class ArtifactPreprocessorPluginTestCase(shared_test_lib.BaseTestCase):
  """Artifact preprocessor plugin test case."""

  @classmethod
  def setUpClass(cls):
    """Makes preparations before running any of the tests."""
    artifacts_path = shared_test_lib.GetTestFilePath(['artifacts'])
    cls._artifacts_registry = artifacts_registry.ArtifactDefinitionsRegistry()

    reader = artifacts_reader.YamlArtifactsReader()
    cls._artifacts_registry.ReadFromDirectory(reader, artifacts_path)

  def _CreateTestStorageWriter(self):
    """Creates a storage writer for testing purposes.

    Returns:
      StorageWriter: storage writer.
    """
    storage_writer = fake_writer.FakeStorageWriter()
    storage_writer.Open()
    return storage_writer

  def _RunPreprocessorPluginOnFileSystem(
      self, file_system, mount_point, storage_writer, plugin):
    """Runs a preprocessor plugin on a file system.

    Args:
      file_system (dfvfs.FileSystem): file system to be preprocessed.
      mount_point (dfvfs.PathSpec): mount point path specification that refers
          to the base location of the file system.
      storage_writer (StorageWriter): storage writer.
      plugin (ArtifactPreprocessorPlugin): preprocessor plugin.

    Return:
      PreprocessMediator: preprocess mediator.
    """
    artifact_definition = self._artifacts_registry.GetDefinitionByName(
        plugin.ARTIFACT_DEFINITION_NAME)
    self.assertIsNotNone(artifact_definition)

    test_mediator = mediator.PreprocessMediator(storage_writer)

    searcher = file_system_searcher.FileSystemSearcher(file_system, mount_point)

    plugin.Collect(test_mediator, artifact_definition, searcher, file_system)

    return test_mediator