File: registry_test.py

package info (click to toggle)
forensic-artifacts 20190113-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 712 kB
  • sloc: python: 1,669; sh: 166; makefile: 21
file content (153 lines) | stat: -rw-r--r-- 4,921 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# -*- coding: utf-8 -*-
"""Tests for the artifact definitions registry."""

import io
import unittest

from artifacts import errors
from artifacts import reader
from artifacts import registry
from artifacts import source_type

from tests import test_lib


class TestSourceType(source_type.SourceType):
  """Class that implements a test source type."""

  TYPE_INDICATOR = u'test'

  def __init__(self, test=None):
    """Initializes the source type object.

    Args:
      test (Optional[str]): test string.

    Raises:
      FormatError: when test is not set.
    """
    if not test:
      raise errors.FormatError(u'Missing test value.')

    super(TestSourceType, self).__init__()
    self.test = test

  def AsDict(self):
    """Represents a source type as a dictionary.

    Returns:
      dict[str, str]: source type attributes.
    """
    return {u'test': self.test}


class ArtifactDefinitionsRegistryTest(test_lib.BaseTestCase):
  """Tests for the artifact definitions registry."""

  # pylint: disable=protected-access

  @test_lib.skipUnlessHasTestFile(['definitions.yaml'])
  def testArtifactDefinitionsRegistry(self):
    """Tests the ArtifactDefinitionsRegistry functions."""
    artifact_registry = registry.ArtifactDefinitionsRegistry()

    artifact_reader = reader.YamlArtifactsReader()
    test_file = self._GetTestFilePath(['definitions.yaml'])

    for artifact_definition in artifact_reader.ReadFile(test_file):
      artifact_registry.RegisterDefinition(artifact_definition)

    # Make sure the test file got turned into artifacts.
    self.assertEqual(len(artifact_registry.GetDefinitions()), 7)

    artifact_definition = artifact_registry.GetDefinitionByName(u'EventLogs')
    self.assertIsNotNone(artifact_definition)

    # Try to register something already registered
    with self.assertRaises(KeyError):
      artifact_registry.RegisterDefinition(artifact_definition)

    # Deregister
    artifact_registry.DeregisterDefinition(artifact_definition)

    # Check it is gone
    with self.assertRaises(KeyError):
      artifact_registry.DeregisterDefinition(artifact_definition)

    self.assertEqual(len(artifact_registry.GetDefinitions()), 6)

    test_artifact_definition = artifact_registry.GetDefinitionByName(
        u'SecurityEventLogEvtx')
    self.assertIsNotNone(test_artifact_definition)

    self.assertEqual(test_artifact_definition.name, u'SecurityEventLogEvtx')

    expected_description = (
        u'Windows Security Event log for Vista or later systems.')
    self.assertEqual(test_artifact_definition.description, expected_description)

    bad_args = io.BytesIO(
        b'name: SecurityEventLogEvtx\n'
        b'doc: Windows Security Event log for Vista or later systems.\n'
        b'sources:\n'
        b'- type: FILE\n'
        b'  attributes: {broken: [\'%%environ_systemroot%%\\System32\\'
        b'winevt\\Logs\\Security.evtx\']}\n'
        b'conditions: [os_major_version >= 6]\n'
        b'labels: [Logs]\n'
        b'supported_os: [Windows]\n'
        b'urls: [\'http://www.forensicswiki.org/wiki/\n'
        b'Windows_XML_Event_Log_(EVTX)\']\n')

    generator = artifact_reader.ReadFileObject(bad_args)
    with self.assertRaises(errors.FormatError):
      next(generator)

  def testSourceTypeFunctions(self):
    """Tests the source type functions."""
    number_of_source_types = len(
        registry.ArtifactDefinitionsRegistry._source_type_classes)

    registry.ArtifactDefinitionsRegistry.RegisterSourceType(TestSourceType)

    self.assertEqual(
        len(registry.ArtifactDefinitionsRegistry._source_type_classes),
        number_of_source_types + 1)

    with self.assertRaises(KeyError):
      registry.ArtifactDefinitionsRegistry.RegisterSourceType(TestSourceType)

    registry.ArtifactDefinitionsRegistry.DeregisterSourceType(TestSourceType)

    self.assertEqual(
        len(registry.ArtifactDefinitionsRegistry._source_type_classes),
        number_of_source_types)

    registry.ArtifactDefinitionsRegistry.RegisterSourceTypes([TestSourceType])

    self.assertEqual(
        len(registry.ArtifactDefinitionsRegistry._source_type_classes),
        number_of_source_types + 1)

    with self.assertRaises(KeyError):
      registry.ArtifactDefinitionsRegistry.RegisterSourceTypes([TestSourceType])

    source_object = registry.ArtifactDefinitionsRegistry.CreateSourceType(
        u'test', {u'test': u'test123'})

    self.assertIsNotNone(source_object)
    self.assertEqual(source_object.test, u'test123')

    with self.assertRaises(errors.FormatError):
      source_object = registry.ArtifactDefinitionsRegistry.CreateSourceType(
          u'test', {})

    with self.assertRaises(errors.FormatError):
      source_object = registry.ArtifactDefinitionsRegistry.CreateSourceType(
          u'bogus', {})

    registry.ArtifactDefinitionsRegistry.DeregisterSourceType(TestSourceType)


if __name__ == '__main__':
  unittest.main()