File: reader.py

package info (click to toggle)
plaso 20241006-3
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 673,224 kB
  • sloc: python: 91,831; sh: 557; xml: 97; makefile: 17; sql: 14; vhdl: 11
file content (160 lines) | stat: -rw-r--r-- 5,045 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the storage reader."""

import unittest

from acstore.containers import interface as containers_interface

from plaso.containers import event_sources
from plaso.storage import reader
from plaso.storage.fake import fake_store

from tests.storage import test_lib


class StorageReaderTest(test_lib.StorageTestCase):
  """Tests for the storage reader."""

  # pylint: disable=protected-access

  def testInitialization(self):
    """Tests the __init__ function."""
    test_reader = reader.StorageReader()
    self.assertIsNotNone(test_reader)

  # TODO: add tests for __enter__ and __exit__
  # TODO: add tests for Close

  def testGetAttributeContainerByIdentifier(self):
    """Tests the GetAttributeContainerByIdentifier function."""
    test_reader = reader.StorageReader()
    test_reader._store = fake_store.FakeStore()
    test_reader._store.Open()

    try:
      event_source = event_sources.EventSource()
      test_reader._store.AddAttributeContainer(event_source)

      test_identifier = event_source.GetIdentifier()
      test_container = test_reader.GetAttributeContainerByIdentifier(
          event_source.CONTAINER_TYPE, test_identifier)
      self.assertIsNotNone(test_container)

      test_identifier = containers_interface.AttributeContainerIdentifier(
          name=event_source.CONTAINER_TYPE, sequence_number=99)
      test_container = test_reader.GetAttributeContainerByIdentifier(
          event_source.CONTAINER_TYPE, test_identifier)
      self.assertIsNone(test_container)

    finally:
      test_reader._store.Close()

  def testGetAttributeContainerByIndex(self):
    """Tests the GetAttributeContainerByIndex function."""
    test_reader = reader.StorageReader()
    test_reader._store = fake_store.FakeStore()
    test_reader._store.Open()

    try:
      event_source = event_sources.EventSource()
      test_reader._store.AddAttributeContainer(event_source)

      test_container = test_reader.GetAttributeContainerByIndex(
          event_source.CONTAINER_TYPE, 0)
      self.assertIsNotNone(test_container)

      test_container = test_reader.GetAttributeContainerByIndex(
          event_source.CONTAINER_TYPE, 99)
      self.assertIsNone(test_container)

    finally:
      test_reader._store.Close()

  def testGetAttributeContainers(self):
    """Tests the GetAttributeContainers function."""
    test_reader = reader.StorageReader()
    test_reader._store = fake_store.FakeStore()
    test_reader._store.Open()

    try:
      event_source = event_sources.EventSource()
      test_reader._store.AddAttributeContainer(event_source)

      test_generator = test_reader.GetAttributeContainers(
          event_source.CONTAINER_TYPE)
      test_containers = list(test_generator)
      self.assertEqual(len(test_containers), 1)

    finally:
      test_reader._store.Close()

  def testGetFormatVersion(self):
    """Tests the GetFormatVersion function."""
    test_reader = reader.StorageReader()
    test_reader._store = fake_store.FakeStore()

    format_version = test_reader.GetFormatVersion()
    self.assertIsNone(format_version)

  def testGetNumberOfAttributeContainers(self):
    """Tests the GetNumberOfAttributeContainers function."""
    test_reader = reader.StorageReader()
    test_reader._store = fake_store.FakeStore()
    test_reader._store.Open()

    try:
      event_source = event_sources.EventSource()
      test_reader._store.AddAttributeContainer(event_source)

      number_of_containers = test_reader.GetNumberOfAttributeContainers(
          event_source.CONTAINER_TYPE)
      self.assertEqual(number_of_containers, 1)

    finally:
      test_reader._store.Close()

  def testGetSerializationFormat(self):
    """Tests the GetSerializationFormat function."""
    test_reader = reader.StorageReader()
    test_reader._store = fake_store.FakeStore()

    serialization_format = test_reader.GetSerializationFormat()
    self.assertIsNone(serialization_format)

  # TODO: add tests for GetSessions
  # TODO: add tests for GetSortedEvents

  def testHasAttributeContainers(self):
    """Tests the HasAttributeContainers function."""
    test_reader = reader.StorageReader()
    test_reader._store = fake_store.FakeStore()
    test_reader._store.Open()

    try:
      event_source = event_sources.EventSource()
      test_reader._store.AddAttributeContainer(event_source)

      result = test_reader.HasAttributeContainers(event_source.CONTAINER_TYPE)
      self.assertTrue(result)

    finally:
      test_reader._store.Close()

  def testSetSerializersProfiler(self):
    """Tests the SetSerializersProfiler function."""
    test_reader = reader.StorageReader()
    test_reader._store = fake_store.FakeStore()

    test_reader.SetSerializersProfiler(None)

  def testSetStorageProfiler(self):
    """Tests the SetStorageProfiler function."""
    test_reader = reader.StorageReader()
    test_reader._store = fake_store.FakeStore()

    test_reader.SetStorageProfiler(None)


if __name__ == '__main__':
  unittest.main()