1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
# -*- coding: utf-8 -*-
"""Tests for the artifact definitions registry."""
import io
import unittest
from artifacts import errors
from artifacts import reader
from artifacts import registry
from artifacts import source_type
from tests import test_lib
class TestSourceType(source_type.SourceType):
"""Class that implements a test source type."""
TYPE_INDICATOR = u'test'
def __init__(self, test=None):
"""Initializes the source type object.
Args:
test (Optional[str]): test string.
Raises:
FormatError: when test is not set.
"""
if not test:
raise errors.FormatError(u'Missing test value.')
super(TestSourceType, self).__init__()
self.test = test
def AsDict(self):
"""Represents a source type as a dictionary.
Returns:
dict[str, str]: source type attributes.
"""
return {u'test': self.test}
class ArtifactDefinitionsRegistryTest(test_lib.BaseTestCase):
"""Tests for the artifact definitions registry."""
# pylint: disable=protected-access
@test_lib.skipUnlessHasTestFile(['definitions.yaml'])
def testArtifactDefinitionsRegistry(self):
"""Tests the ArtifactDefinitionsRegistry functions."""
artifact_registry = registry.ArtifactDefinitionsRegistry()
artifact_reader = reader.YamlArtifactsReader()
test_file = self._GetTestFilePath(['definitions.yaml'])
for artifact_definition in artifact_reader.ReadFile(test_file):
artifact_registry.RegisterDefinition(artifact_definition)
# Make sure the test file got turned into artifacts.
self.assertEqual(len(artifact_registry.GetDefinitions()), 7)
artifact_definition = artifact_registry.GetDefinitionByName(u'EventLogs')
self.assertIsNotNone(artifact_definition)
# Try to register something already registered
with self.assertRaises(KeyError):
artifact_registry.RegisterDefinition(artifact_definition)
# Deregister
artifact_registry.DeregisterDefinition(artifact_definition)
# Check it is gone
with self.assertRaises(KeyError):
artifact_registry.DeregisterDefinition(artifact_definition)
self.assertEqual(len(artifact_registry.GetDefinitions()), 6)
test_artifact_definition = artifact_registry.GetDefinitionByName(
u'SecurityEventLogEvtx')
self.assertIsNotNone(test_artifact_definition)
self.assertEqual(test_artifact_definition.name, u'SecurityEventLogEvtx')
expected_description = (
u'Windows Security Event log for Vista or later systems.')
self.assertEqual(test_artifact_definition.description, expected_description)
bad_args = io.BytesIO(
b'name: SecurityEventLogEvtx\n'
b'doc: Windows Security Event log for Vista or later systems.\n'
b'sources:\n'
b'- type: FILE\n'
b' attributes: {broken: [\'%%environ_systemroot%%\\System32\\'
b'winevt\\Logs\\Security.evtx\']}\n'
b'conditions: [os_major_version >= 6]\n'
b'labels: [Logs]\n'
b'supported_os: [Windows]\n'
b'urls: [\'http://www.forensicswiki.org/wiki/\n'
b'Windows_XML_Event_Log_(EVTX)\']\n')
generator = artifact_reader.ReadFileObject(bad_args)
with self.assertRaises(errors.FormatError):
next(generator)
def testSourceTypeFunctions(self):
"""Tests the source type functions."""
number_of_source_types = len(
registry.ArtifactDefinitionsRegistry._source_type_classes)
registry.ArtifactDefinitionsRegistry.RegisterSourceType(TestSourceType)
self.assertEqual(
len(registry.ArtifactDefinitionsRegistry._source_type_classes),
number_of_source_types + 1)
with self.assertRaises(KeyError):
registry.ArtifactDefinitionsRegistry.RegisterSourceType(TestSourceType)
registry.ArtifactDefinitionsRegistry.DeregisterSourceType(TestSourceType)
self.assertEqual(
len(registry.ArtifactDefinitionsRegistry._source_type_classes),
number_of_source_types)
registry.ArtifactDefinitionsRegistry.RegisterSourceTypes([TestSourceType])
self.assertEqual(
len(registry.ArtifactDefinitionsRegistry._source_type_classes),
number_of_source_types + 1)
with self.assertRaises(KeyError):
registry.ArtifactDefinitionsRegistry.RegisterSourceTypes([TestSourceType])
source_object = registry.ArtifactDefinitionsRegistry.CreateSourceType(
u'test', {u'test': u'test123'})
self.assertIsNotNone(source_object)
self.assertEqual(source_object.test, u'test123')
with self.assertRaises(errors.FormatError):
source_object = registry.ArtifactDefinitionsRegistry.CreateSourceType(
u'test', {})
with self.assertRaises(errors.FormatError):
source_object = registry.ArtifactDefinitionsRegistry.CreateSourceType(
u'bogus', {})
registry.ArtifactDefinitionsRegistry.DeregisterSourceType(TestSourceType)
if __name__ == '__main__':
unittest.main()
|