
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the dynamic output module."""
from __future__ import unicode_literals
import unittest
from plaso.formatters import manager as formatters_manager
from plaso.lib import definitions
from plaso.output import dynamic
from tests.cli import test_lib as cli_test_lib
from tests.containers import test_lib as containers_test_lib
from tests.formatters import test_lib as formatters_test_lib
from tests.output import test_lib
class DynamicFieldFormattingHelperTest(test_lib.OutputModuleTestCase):
"""Test the dynamic field formatting helper."""
# pylint: disable=protected-access
_TEST_EVENTS = [
{'data_type': 'test:event',
'filename': 'log/syslog.1',
'hostname': 'ubuntu',
'text': (
'Reporter <CRON> PID: 8442 (pam_unix(cron:session): session\n '
'closed for user root)'),
'timestamp': '2012-06-27 18:17:01',
'timestamp_desc': definitions.TIME_DESCRIPTION_CHANGE}]
def testFormatDate(self):
"""Tests the _FormatDate function."""
output_mediator = self._CreateOutputMediator()
formatting_helper = dynamic.DynamicFieldFormattingHelper(output_mediator)
event, event_data, event_data_stream = (
containers_test_lib.CreateEventFromValues(self._TEST_EVENTS[0]))
date_string = formatting_helper._FormatDate(
event, event_data, event_data_stream)
self.assertEqual(date_string, '2012-06-27')
event.timestamp = -9223372036854775808
date_string = formatting_helper._FormatDate(
event, event_data, event_data_stream)
self.assertEqual(date_string, '0000-00-00')
def testFormatDateTime(self):
"""Tests the _FormatDateTime function."""
output_mediator = self._CreateOutputMediator()
formatting_helper = dynamic.DynamicFieldFormattingHelper(output_mediator)
event, event_data, event_data_stream = (
containers_test_lib.CreateEventFromValues(self._TEST_EVENTS[0]))
date_time_string = formatting_helper._FormatDateTime(
event, event_data, event_data_stream)
self.assertEqual(date_time_string, '2012-06-27T18:17:01+00:00')
event.timestamp = -9223372036854775808
date_time_string = formatting_helper._FormatDateTime(
event, event_data, event_data_stream)
self.assertEqual(date_time_string, '0000-00-00T00:00:00')
# TODO: add tests for _FormatFilename
def testFormatTimestampDescription(self):
"""Tests the _FormatTimestampDescription function."""
output_mediator = self._CreateOutputMediator()
formatting_helper = dynamic.DynamicFieldFormattingHelper(output_mediator)
event, event_data, event_data_stream = (
containers_test_lib.CreateEventFromValues(self._TEST_EVENTS[0]))
timestamp_description_string = (
formatting_helper._FormatTimestampDescription(
event, event_data, event_data_stream))
self.assertEqual(timestamp_description_string, 'Metadata Modification Time')
class DynamicOutputModuleTest(test_lib.OutputModuleTestCase):
"""Test the dynamic output module."""
# pylint: disable=protected-access
_TEST_EVENTS = [
{'data_type': 'test:event',
'filename': 'log/syslog.1',
'hostname': 'ubuntu',
'text': (
'Reporter <CRON> PID: 8442 (pam_unix(cron:session): session\n '
'closed for user root)'),
'timestamp': '2012-06-27 18:17:01',
'timestamp_desc': definitions.TIME_DESCRIPTION_CHANGE}]
def testWriteEventBody(self):
"""Tests the WriteEventBody function."""
output_mediator = self._CreateOutputMediator()
output_writer = cli_test_lib.TestOutputWriter()
output_module = dynamic.DynamicOutputModule(output_mediator)
output_module.SetFields([
'date', 'time', 'timezone', 'macb', 'source', 'sourcetype',
'type', 'user', 'host', 'message_short', 'message',
'filename', 'inode', 'notes', 'format', 'extra'])
output_module.SetOutputWriter(output_writer)
output_module.WriteHeader()
expected_header = (
'date,time,timezone,macb,source,sourcetype,type,user,host,'
'message_short,message,filename,inode,notes,format,extra\n')
header = output_writer.ReadOutput()
self.assertEqual(header, expected_header)
event, event_data, event_data_stream = (
containers_test_lib.CreateEventFromValues(self._TEST_EVENTS[0]))
formatters_manager.FormattersManager.RegisterFormatter(
formatters_test_lib.TestEventFormatter)
try:
output_module.WriteEventBody(event, event_data, event_data_stream, None)
finally:
formatters_manager.FormattersManager.DeregisterFormatter(
formatters_test_lib.TestEventFormatter)
expected_event_body = (
'2012-06-27,18:17:01,UTC,..C.,FILE,Test log file,Metadata '
'Modification Time,-,ubuntu,Reporter <CRON> PID: 8442 '
'(pam_unix(cron:session): session closed for user root),Reporter '
'<CRON> PID: 8442 (pam_unix(cron:session): session closed for user '
'root),log/syslog.1,-,-,-,-\n')
event_body = output_writer.ReadOutput()
self.assertEqual(event_body, expected_event_body)
output_mediator = self._CreateOutputMediator()
output_writer = cli_test_lib.TestOutputWriter()
output_module = dynamic.DynamicOutputModule(output_mediator)
output_module.SetFields([
'datetime', 'nonsense', 'hostname', 'message'])
output_module.SetOutputWriter(output_writer)
expected_header = 'datetime,nonsense,hostname,message\n'
output_module.WriteHeader()
header = output_writer.ReadOutput()
self.assertEqual(header, expected_header)
expected_event_body = (
'2012-06-27T18:17:01+00:00,-,ubuntu,Reporter <CRON> PID: 8442'
' (pam_unix(cron:session): session closed for user root)\n')
event, event_data, event_data_stream = (
containers_test_lib.CreateEventFromValues(self._TEST_EVENTS[0]))
formatters_manager.FormattersManager.RegisterFormatter(
formatters_test_lib.TestEventFormatter)
try:
output_module.WriteEventBody(event, event_data, event_data_stream, None)
finally:
formatters_manager.FormattersManager.DeregisterFormatter(
formatters_test_lib.TestEventFormatter)
event_body = output_writer.ReadOutput()
self.assertEqual(event_body, expected_event_body)
def testHeader(self):
"""Tests the WriteHeader function."""
output_mediator = self._CreateOutputMediator()
output_writer = cli_test_lib.TestOutputWriter()
output_module = dynamic.DynamicOutputModule(output_mediator)
output_module.SetOutputWriter(output_writer)
expected_header = (
'datetime,timestamp_desc,source,source_long,message,parser,'
'display_name,tag\n')
output_module.WriteHeader()
header = output_writer.ReadOutput()
self.assertEqual(header, expected_header)
output_mediator = self._CreateOutputMediator()
output_writer = cli_test_lib.TestOutputWriter()
output_module = dynamic.DynamicOutputModule(output_mediator)
output_module.SetFields([
'date', 'time', 'message', 'hostname', 'filename', 'some_stuff'])
output_module.SetOutputWriter(output_writer)
expected_header = 'date,time,message,hostname,filename,some_stuff\n'
output_module.WriteHeader()
header = output_writer.ReadOutput()
self.assertEqual(header, expected_header)
output_mediator = self._CreateOutputMediator()
output_writer = cli_test_lib.TestOutputWriter()
output_module = dynamic.DynamicOutputModule(output_mediator)
output_module.SetFields([
'date', 'time', 'message', 'hostname', 'filename', 'some_stuff'])
output_module.SetFieldDelimiter('@')
output_module.SetOutputWriter(output_writer)
expected_header = 'date@time@message@hostname@filename@some_stuff\n'
output_module.WriteHeader()
header = output_writer.ReadOutput()
self.assertEqual(header, expected_header)
if __name__ == '__main__':
unittest.main()
|