1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the merge reader for SQLite storage files."""
from __future__ import unicode_literals
import os
import unittest
from plaso.containers import sessions
from plaso.containers import tasks
from plaso.lib import definitions
from plaso.storage.sqlite import merge_reader
from plaso.storage.sqlite import writer
from tests import test_lib as shared_test_lib
from tests.containers import test_lib as containers_test_lib
from tests.storage import test_lib
class SQLiteStorageMergeReaderTest(test_lib.StorageTestCase):
"""Tests for the SQLite-based storage file reader for merging."""
# pylint: disable=protected-access
_TEST_EVENTS_WITH_DESERIALIZATION_ERROR = [
{'data_type': 'windows:registry:key_value',
'key_path': 'MY AutoRun key',
'parser': 'UNKNOWN',
'timestamp': '2012-04-20 22:38:46.929596',
'timestamp_desc': definitions.TIME_DESCRIPTION_WRITTEN,
'values': 'Value: c:/Temp/evil.exe'}]
def _CreateTaskStorageFile(self, session, path, event_values_list):
"""Creates a task storage file for testing.
Args:
session (Session): session the task storage is part of.
path (str): path to the task storage file that should be merged.
event_values_list (list[dict[str, str]]): list of event values.
"""
task = tasks.Task(session_identifier=session.identifier)
storage_file = writer.SQLiteStorageFileWriter(
session, path, storage_type=definitions.STORAGE_TYPE_TASK, task=task)
storage_file.Open()
for event, event_data, event_data_stream in (
containers_test_lib.CreateEventsFromValues(event_values_list)):
storage_file.AddEventDataStream(event_data_stream)
event_data.SetEventDataStreamIdentifier(event_data_stream.GetIdentifier())
storage_file.AddEventData(event_data)
event.SetEventDataIdentifier(event_data.GetIdentifier())
storage_file.AddEvent(event)
storage_file.Close()
def testReadStorageMetadata(self):
"""Tests the _ReadStorageMetadata function."""
session = sessions.Session()
with shared_test_lib.TempDirectory() as temp_directory:
task_storage_path = os.path.join(temp_directory, 'task.sqlite')
self._CreateTaskStorageFile(session, task_storage_path, self._TEST_EVENTS)
session_storage_path = os.path.join(temp_directory, 'plaso.sqlite')
storage_writer = writer.SQLiteStorageFileWriter(
session, session_storage_path)
test_reader = merge_reader.SQLiteStorageMergeReader(
storage_writer, task_storage_path)
test_reader._Open()
test_reader._ReadStorageMetadata()
test_reader._Close()
def testMergeAttributeContainers(self):
"""Tests the MergeAttributeContainers function."""
session = sessions.Session()
with shared_test_lib.TempDirectory() as temp_directory:
task_storage_path = os.path.join(temp_directory, 'task.sqlite')
self._CreateTaskStorageFile(session, task_storage_path, self._TEST_EVENTS)
session_storage_path = os.path.join(temp_directory, 'plaso.sqlite')
storage_writer = writer.SQLiteStorageFileWriter(
session, session_storage_path)
test_reader = merge_reader.SQLiteStorageMergeReader(
storage_writer, task_storage_path)
storage_writer.Open()
result = test_reader.MergeAttributeContainers()
self.assertTrue(result)
storage_writer.Close()
def testMergeAttributeContainersWithDeserializationError(self):
"""Tests MergeAttributeContainers with a deserialization error."""
session = sessions.Session()
with shared_test_lib.TempDirectory() as temp_directory:
task_storage_path = os.path.join(temp_directory, 'task.sqlite')
self._CreateTaskStorageFile(
session, task_storage_path,
self._TEST_EVENTS_WITH_DESERIALIZATION_ERROR)
session_storage_path = os.path.join(temp_directory, 'plaso.sqlite')
storage_writer = writer.SQLiteStorageFileWriter(
session, session_storage_path)
test_reader = merge_reader.SQLiteStorageMergeReader(
storage_writer, task_storage_path)
storage_writer.Open()
result = test_reader.MergeAttributeContainers()
self.assertTrue(result)
storage_writer.Close()
if __name__ == '__main__':
unittest.main()
|