1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the preprocess plugins manager."""
from __future__ import unicode_literals
import unittest
from artifacts import reader as artifacts_reader
from artifacts import registry as artifacts_registry
from plaso.engine import knowledge_base as knowledge_base_library
from plaso.preprocessors import interface
from plaso.preprocessors import manager
from tests import test_lib as shared_test_lib
class TestArtifactPreprocessorPlugin(interface.ArtifactPreprocessorPlugin):
"""Test artifact preprocessor plugin."""
ARTIFACT_DEFINITION_NAME = 'TestArtifactDefinition'
# pylint: disable=unused-argument
def ParseValueData(self, knowledge_base, value_data):
"""Parses artifact value data for a preprocessing attribute.
Args:
knowledge_base (KnowledgeBase): to fill with preprocessing information.
value_data (object): artifact value data.
"""
return
class PreprocessPluginsManagerTest(shared_test_lib.BaseTestCase):
"""Tests for the preprocess plugins manager."""
# pylint: disable=protected-access
def testCollectFromFileSystem(self):
"""Tests the CollectFromFileSystem function."""
artifacts_path = self._GetTestFilePath(['artifacts'])
self._SkipIfPathNotExists(artifacts_path)
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
registry.ReadFromDirectory(reader, artifacts_path)
knowledge_base_object = knowledge_base_library.KnowledgeBase()
_ = knowledge_base_object
# TODO: implement.
# manager.PreprocessPluginsManager.CollectFromFileSystem(
# registry, knowledge_base_object, None, None)
# TODO: add tests for CollectFromWindowsRegistry
# TODO: add tests for GetNames
def testRegistrationPlugin(self):
"""Tests RegisterPlugin and DeregisterPlugin functions."""
number_of_plugins = len(manager.PreprocessPluginsManager._plugins)
manager.PreprocessPluginsManager.RegisterPlugin(
TestArtifactPreprocessorPlugin)
self.assertEqual(
len(manager.PreprocessPluginsManager._plugins), number_of_plugins + 1)
with self.assertRaises(KeyError):
manager.PreprocessPluginsManager.RegisterPlugin(
TestArtifactPreprocessorPlugin)
manager.PreprocessPluginsManager.DeregisterPlugin(
TestArtifactPreprocessorPlugin)
self.assertEqual(
len(manager.PreprocessPluginsManager._plugins), number_of_plugins)
# TODO: add tests for RegisterPlugins
# TODO: add tests for RunPlugins
if __name__ == '__main__':
unittest.main()
|