File: viper.py

package info (click to toggle)
plaso 20201007-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 519,924 kB
  • sloc: python: 79,002; sh: 629; xml: 72; sql: 14; vhdl: 11; makefile: 10
file content (137 lines) | stat: -rw-r--r-- 4,467 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Tests for the Viper analysis plugin."""

from __future__ import unicode_literals

import unittest

try:
  import mock  # pylint: disable=import-error
except ImportError:
  from unittest import mock

from dfvfs.path import fake_path_spec

from plaso.analysis import viper
from plaso.lib import definitions

from tests.analysis import test_lib


class MockResponse(dict):
  """A mock object to simulate a response object from the requests library."""

  # Note: that the following functions do not follow the style guide
  # because they are part of the requests response interface.
  # pylint: disable=invalid-name

  def json(self):
    """Provided for compatibility with the requests library."""
    return self

  def raise_for_status(self):
    """Provided for compatibility with the requests library."""
    return


class ViperTest(test_lib.AnalysisPluginTestCase):
  """Tests for the Viper analysis plugin."""

  _EVENT_1_HASH = (
      '2d79fcc6b02a2e183a0cb30e0e25d103f42badda9fbf86bbee06f93aa3855aff')

  _TEST_EVENTS = [{
      'data_type': 'pe:compilation:compilation_time',
      'path_spec': fake_path_spec.FakePathSpec(
          location='C:\\WINDOWS\\system32\\evil.exe'),
      'pe_type': 'Executable (EXE)',
      'sha256_hash': _EVENT_1_HASH,
      'timestamp': '2015-01-01 17:00:00',
      'timestamp_desc': definitions.TIME_DESCRIPTION_UNKNOWN}]

  # pylint: disable=unused-argument
  def _MockPost(self, url, data=None):
    """Mock function to simulate a Viper API request.

    Args:
      url (str): URL being requested.
      data (dict[str, object]): simulated form data for the Viper API request.

    Returns:
      MockResponse: mocked response that simulates a real response object
          returned by the requests library from the Viper API.
    """
    sha256_hash = data.get('sha256', None)
    if sha256_hash != self._EVENT_1_HASH:
      self.fail('Unexpected data in request.post().')

    response = MockResponse()
    response['default'] = ({
        'sha1': '13da502ab0d75daca5e5075c60e81bfe3b7a637f',
        'name': 'darkcomet.exe',
        'tags': [
            'rat',
            'darkcomet'],
        'sha512': '7e81e0c4f49f1884ebebdf6e53531e7836721c2ae417'
                  '29cf5bc0340f3369e7d37fe4168a7434b2b0420b299f5c'
                  '1d9a4f482f1bda8e66e40345757d97e5602b2d',
        'created_at': '2015-03-30 23:13:20.595238',
        'crc32': '2238B48E',
        'ssdeep': '12288:D9HFJ9rJxRX1uVVjoaWSoynxdO1FVBaOiRZTERfIhNk'
                  'NCCLo9Ek5C/hlg:NZ1xuVVjfFoynPaVBUR8f+kN10EB/g',
        'sha256': '2d79fcc6b02a2e183a0cb30e0e25d103f42badda9fbf86bbee06f9'
                  '3aa3855aff',
        'type': 'PE32 executable (GUI) Intel 80386, for MS Windows',
        'id': 10,
        'md5': '9f2520a3056543d49bb0f822d85ce5dd',
        'size': 774144},)

    return response

  def setUp(self):
    """Makes preparations before running an individual test."""
    self.requests_patcher = mock.patch('requests.post', self._MockPost)
    self.requests_patcher.start()

  def tearDown(self):
    """Cleans up after running an individual test."""
    self.requests_patcher.stop()

  def testExamineEventAndCompileReport(self):
    """Tests the ExamineEvent and CompileReport functions."""
    plugin = viper.ViperAnalysisPlugin()
    plugin.SetHost('localhost')
    plugin.SetPort(8080)
    plugin.SetProtocol('http')

    storage_writer = self._AnalyzeEvents(self._TEST_EVENTS, plugin)

    self.assertEqual(len(storage_writer.analysis_reports), 1)
    self.assertEqual(storage_writer.number_of_event_tags, 1)

    report = storage_writer.analysis_reports[0]
    self.assertIsNotNone(report)

    expected_text = (
        'viper hash tagging results\n'
        '1 path specifications tagged with label: viper_present\n'
        '1 path specifications tagged with label: viper_project_default\n'
        '1 path specifications tagged with label: viper_tag_darkcomet\n'
        '1 path specifications tagged with label: viper_tag_rat\n')

    self.assertEqual(report.text, expected_text)

    labels = []
    for event_tag in storage_writer.GetEventTags():
      labels.extend(event_tag.labels)
    self.assertEqual(len(labels), 4)

    expected_labels = [
        'viper_present', 'viper_project_default', 'viper_tag_darkcomet',
        'viper_tag_rat']
    self.assertEqual(sorted(labels), expected_labels)


if __name__ == '__main__':
  unittest.main()