File: test_lib.py

package info (click to toggle)
plaso 20201007-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 519,924 kB
  • sloc: python: 79,002; sh: 629; xml: 72; sql: 14; vhdl: 11; makefile: 10
file content (71 lines) | stat: -rw-r--r-- 2,065 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# -*- coding: utf-8 -*-
"""Multi-processing related functions and classes for testing."""

from __future__ import unicode_literals

from plaso.engine import knowledge_base
from plaso.parsers import mediator as parsers_mediator
from plaso.storage.fake import writer as fake_writer

from tests import test_lib as shared_test_lib


class MultiProcessingTestCase(shared_test_lib.BaseTestCase):
  """Multi-processing test case."""

  def _CreateKnowledgeBase(self, knowledge_base_values=None, timezone='UTC'):
    """Creates a knowledge base.

    Args:
      knowledge_base_values (Optional[dict]): knowledge base values.
      timezone (str): timezone.

    Returns:
      KnowledgeBase: knowledge base.
    """
    knowledge_base_object = knowledge_base.KnowledgeBase()
    if knowledge_base_values:
      for identifier, value in knowledge_base_values.items():
        knowledge_base_object.SetValue(identifier, value)

    knowledge_base_object.SetTimeZone(timezone)

    return knowledge_base_object

  def _CreateParserMediator(
      self, storage_writer, knowledge_base_object, file_entry=None,
      parser_chain=None):
    """Creates a parser mediator.

    Args:
      storage_writer (StorageWriter): storage writer.
      knowledge_base_object (KnowledgeBase): knowledge base.
      file_entry (Optional[dfvfs.FileEntry]): file entry object being parsed.
      parser_chain (Optional[str]): parsing chain up to this point.

    Returns:
      ParserMediator: parser mediator.
    """
    parser_mediator = parsers_mediator.ParserMediator(
        storage_writer, knowledge_base_object)

    if file_entry:
      parser_mediator.SetFileEntry(file_entry)

    if parser_chain:
      parser_mediator.parser_chain = parser_chain

    return parser_mediator

  def _CreateStorageWriter(self, session):
    """Creates a storage writer object.

    Args:
      session (Session): session.

    Returns:
      FakeStorageWriter: storage writer.
    """
    storage_writer = fake_writer.FakeStorageWriter(session)
    storage_writer.Open()
    return storage_writer