File: single_process.py

package info (click to toggle)
plaso 20201007-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 519,924 kB
  • sloc: python: 79,002; sh: 629; xml: 72; sql: 14; vhdl: 11; makefile: 10
file content (65 lines) | stat: -rw-r--r-- 2,133 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests the single process processing engine."""

from __future__ import unicode_literals

import unittest

from artifacts import reader as artifacts_reader
from artifacts import registry as artifacts_registry
from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.path import factory as path_spec_factory
from dfvfs.resolver import context

from plaso.containers import sessions
from plaso.engine import configurations
from plaso.engine import single_process
from plaso.storage.fake import writer as fake_writer

from tests import test_lib as shared_test_lib


class SingleProcessEngineTest(shared_test_lib.BaseTestCase):
  """Tests for the single process engine object."""

  # pylint: disable=protected-access

  def testProcessSources(self):
    """Tests the ProcessSources function."""
    test_artifacts_path = self._GetTestFilePath(['artifacts'])
    self._SkipIfPathNotExists(test_artifacts_path)

    test_file_path = self._GetTestFilePath(['ímynd.dd'])
    self._SkipIfPathNotExists(test_file_path)

    registry = artifacts_registry.ArtifactDefinitionsRegistry()
    reader = artifacts_reader.YamlArtifactsReader()
    registry.ReadFromDirectory(reader, test_artifacts_path)

    test_engine = single_process.SingleProcessEngine()
    resolver_context = context.Context()
    session = sessions.Session()

    os_path_spec = path_spec_factory.Factory.NewPathSpec(
        dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
    source_path_spec = path_spec_factory.Factory.NewPathSpec(
        dfvfs_definitions.TYPE_INDICATOR_TSK, location='/',
        parent=os_path_spec)

    test_engine.PreprocessSources(registry, [source_path_spec])

    storage_writer = fake_writer.FakeStorageWriter(session)

    configuration = configurations.ProcessingConfiguration()
    configuration.parser_filter_expression = 'filestat'

    test_engine.ProcessSources(
        session, [source_path_spec], storage_writer, resolver_context,
        configuration)

    self.assertEqual(storage_writer.number_of_events, 15)


if __name__ == '__main__':
  unittest.main()