1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the SQLite-based storage writer."""
import os
import unittest
from plaso.containers import events
from plaso.lib import definitions
from plaso.storage.sqlite import writer as sqlite_writer
from tests import test_lib as shared_test_lib
from tests.storage import test_lib
from tests.containers import test_lib as containers_test_lib
class SQLiteStorageWriterTest(test_lib.StorageTestCase):
"""Tests for the SQLite-based storage writer."""
# pylint: disable=protected-access
def _AddTestEvents(self, storage_writer):
"""Adds tests events to the storage writer.
Args:
storage_writer (SQLiteStorageWriter): storage writer.
Returns:
list[EventObject]: test events.
"""
test_events = []
for event, event_data, event_data_stream in (
containers_test_lib.CreateEventsFromValues(self._TEST_EVENTS)):
storage_writer.AddAttributeContainer(event_data_stream)
event_data.SetEventDataStreamIdentifier(event_data_stream.GetIdentifier())
storage_writer.AddAttributeContainer(event_data)
event.SetEventDataIdentifier(event_data.GetIdentifier())
storage_writer.AddAttributeContainer(event)
test_events.append(event)
return test_events
def testAddAttributeContainer(self):
"""Tests the AddAttributeContainer function."""
event_data_stream = events.EventDataStream()
with shared_test_lib.TempDirectory() as temp_directory:
test_path = os.path.join(temp_directory, 'plaso.sqlite')
storage_writer = sqlite_writer.SQLiteStorageWriter()
storage_writer.Open(path=test_path)
try:
number_of_containers = storage_writer.GetNumberOfAttributeContainers(
event_data_stream.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 0)
storage_writer.AddAttributeContainer(event_data_stream)
number_of_containers = storage_writer.GetNumberOfAttributeContainers(
event_data_stream.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 1)
finally:
storage_writer.Close()
with self.assertRaises(IOError):
storage_writer.AddAttributeContainer(event_data_stream)
def testAddOrUpdateEventTag(self):
"""Tests the AddOrUpdateEventTag function."""
with shared_test_lib.TempDirectory() as temp_directory:
test_path = os.path.join(temp_directory, 'plaso.sqlite')
storage_writer = sqlite_writer.SQLiteStorageWriter()
storage_writer.Open(path=test_path)
try:
test_events = self._AddTestEvents(storage_writer)
event_tag = events.EventTag()
event_identifier = test_events[1].GetIdentifier()
event_tag.SetEventIdentifier(event_identifier)
event_tag.AddLabel('Label1')
number_of_containers = storage_writer.GetNumberOfAttributeContainers(
event_tag.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 0)
storage_writer.AddOrUpdateEventTag(event_tag)
number_of_containers = storage_writer.GetNumberOfAttributeContainers(
event_tag.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 1)
event_tag = events.EventTag()
event_identifier = test_events[2].GetIdentifier()
event_tag.SetEventIdentifier(event_identifier)
event_tag.AddLabel('Label2')
storage_writer.AddOrUpdateEventTag(event_tag)
number_of_containers = storage_writer.GetNumberOfAttributeContainers(
event_tag.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 2)
event_tag = events.EventTag()
event_identifier = test_events[1].GetIdentifier()
event_tag.SetEventIdentifier(event_identifier)
event_tag.AddLabel('AnotherLabel1')
storage_writer.AddOrUpdateEventTag(event_tag)
number_of_containers = storage_writer.GetNumberOfAttributeContainers(
event_tag.CONTAINER_TYPE)
self.assertEqual(number_of_containers, 2)
event_tags = list(storage_writer.GetAttributeContainers(
event_tag.CONTAINER_TYPE))
self.assertEqual(event_tags[0].labels, ['Label1', 'AnotherLabel1'])
self.assertEqual(event_tags[1].labels, ['Label2'])
finally:
storage_writer.Close()
# TODO: add tests for GetFirstWrittenEventSource
# TODO: add tests for GetNextWrittenEventSource
def testGetSortedEvents(self):
"""Tests the GetSortedEvents function."""
with shared_test_lib.TempDirectory() as temp_directory:
test_path = os.path.join(temp_directory, 'plaso.sqlite')
storage_writer = sqlite_writer.SQLiteStorageWriter()
storage_writer.Open(path=test_path)
try:
self._AddTestEvents(storage_writer)
test_events = list(storage_writer.GetSortedEvents())
self.assertEqual(len(test_events), 4)
finally:
storage_writer.Close()
# TODO: add test with time range.
def testOpenClose(self):
"""Tests the Open and Close functions."""
with shared_test_lib.TempDirectory() as temp_directory:
test_path = os.path.join(temp_directory, 'plaso.sqlite')
storage_writer = sqlite_writer.SQLiteStorageWriter()
storage_writer.Open(path=test_path)
storage_writer.Close()
storage_writer.Open(path=test_path)
storage_writer.Close()
storage_writer = sqlite_writer.SQLiteStorageWriter(
storage_type=definitions.STORAGE_TYPE_TASK)
storage_writer.Open(path=test_path)
storage_writer.Close()
storage_writer.Open(path=test_path)
with self.assertRaises(IOError):
storage_writer.Open(path=test_path)
storage_writer.Close()
with self.assertRaises(IOError):
storage_writer.Close()
if __name__ == '__main__':
unittest.main()
|