File: mediator.py

package info (click to toggle)
plaso 20201007-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 519,924 kB
  • sloc: python: 79,002; sh: 629; xml: 72; sql: 14; vhdl: 11; makefile: 10
file content (56 lines) | stat: -rw-r--r-- 1,743 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the analysis mediator."""

from __future__ import unicode_literals

import unittest

from dfvfs.lib import definitions as dfvfs_definitions
from dfvfs.path import factory as path_spec_factory

from plaso.analysis import mediator
from plaso.containers import sessions
from plaso.storage.fake import writer as fake_writer

from tests.analysis import test_lib


class AnalysisMediatorTest(test_lib.AnalysisPluginTestCase):
  """Tests for the analysis mediator."""

  def testGetDisplayNameForPathSpec(self):
    """Tests the GetDisplayNameForPathSpec function."""
    session = sessions.Session()
    storage_writer = fake_writer.FakeStorageWriter(session)
    knowledge_base = self._SetUpKnowledgeBase()

    analysis_mediator = mediator.AnalysisMediator(
        storage_writer, knowledge_base)

    test_path = self._GetTestFilePath(['syslog.gz'])
    os_path_spec = path_spec_factory.Factory.NewPathSpec(
        dfvfs_definitions.TYPE_INDICATOR_OS, location=test_path)

    expected_display_name = 'OS:{0:s}'.format(test_path)
    display_name = analysis_mediator.GetDisplayNameForPathSpec(os_path_spec)
    self.assertEqual(display_name, expected_display_name)

  # TODO: add test for GetUsernameForPath.
  # TODO: add test for ProduceAnalysisReport.
  # TODO: add test for ProduceEventTag.

  def testSignalAbort(self):
    """Tests the SignalAbort function."""
    session = sessions.Session()
    storage_writer = fake_writer.FakeStorageWriter(session)
    knowledge_base = self._SetUpKnowledgeBase()

    analysis_mediator = mediator.AnalysisMediator(
        storage_writer, knowledge_base)

    analysis_mediator.SignalAbort()


if __name__ == '__main__':
  unittest.main()