File: source_analyzer.py

package info (click to toggle)
dfvfs 20201219-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 284,900 kB
  • sloc: python: 30,025; vhdl: 1,921; sh: 465; makefile: 16
file content (344 lines) | stat: -rw-r--r-- 10,526 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Script to analyze a source device, file or directory."""

from __future__ import print_function
from __future__ import unicode_literals

import argparse
import getpass
import locale
import logging
import os
import sys

from dfvfs.credentials import manager as credentials_manager
from dfvfs.helpers import source_scanner
from dfvfs.lib import definitions


class SourceAnalyzer(object):
  """Analyzer to recursively check for volumes and file systems."""

  # Class constant that defines the default read buffer size.
  _READ_BUFFER_SIZE = 32768

  def __init__(self, auto_recurse=True):
    """Initializes a source analyzer.

    Args:
      auto_recurse (Optional[bool]): True if the scan should automatically
          recurse as far as possible.
    """
    super(SourceAnalyzer, self).__init__()
    self._auto_recurse = auto_recurse
    self._encode_errors = 'strict'
    self._preferred_encoding = locale.getpreferredencoding()
    self._source_scanner = source_scanner.SourceScanner()

  def _EncodeString(self, string):
    """Encodes a string in the preferred encoding.

    Returns:
      bytes: encoded string.
    """
    try:
      # Note that encode() will first convert string into a Unicode string
      # if necessary.
      encoded_string = string.encode(
          self._preferred_encoding, errors=self._encode_errors)
    except UnicodeEncodeError:
      if self._encode_errors == 'strict':
        logging.error(
            'Unable to properly write output due to encoding error. '
            'Switching to error tolerant encoding which can result in '
            'non Basic Latin (C0) characters to be replaced with "?" or '
            '"\\ufffd".')
        self._encode_errors = 'replace'

      encoded_string = string.encode(
          self._preferred_encoding, errors=self._encode_errors)

    return encoded_string

  def _PromptUserForEncryptedVolumeCredential(
      self, scan_context, locked_scan_node, output_writer):
    """Prompts the user to provide a credential for an encrypted volume.

    Args:
      scan_context (SourceScannerContext): the source scanner context.
      locked_scan_node (SourceScanNode): the locked scan node.
      output_writer (StdoutWriter): the output writer.
    """
    credentials = credentials_manager.CredentialsManager.GetCredentials(
        locked_scan_node.path_spec)

    # TODO: print volume description.
    if locked_scan_node.type_indicator == (
        definitions.TYPE_INDICATOR_APFS_CONTAINER):
      line = 'Found an APFS encrypted volume.'
    elif locked_scan_node.type_indicator == definitions.TYPE_INDICATOR_BDE:
      line = 'Found a BitLocker encrypted volume.'
    elif locked_scan_node.type_indicator == definitions.TYPE_INDICATOR_FVDE:
      line = 'Found a CoreStorage (FVDE) encrypted volume.'
    else:
      line = 'Found an encrypted volume.'

    output_writer.WriteLine(line)

    credentials_list = list(credentials.CREDENTIALS)
    credentials_list.append('skip')

    # TODO: check which credentials are available.
    output_writer.WriteLine('Supported credentials:')
    output_writer.WriteLine('')
    for index, name in enumerate(credentials_list):
      output_writer.WriteLine('  {0:d}. {1:s}'.format(index + 1, name))
    output_writer.WriteLine('')

    result = False
    while not result:
      output_writer.WriteString(
          'Select a credential to unlock the volume: ')
      # TODO: add an input reader.
      input_line = sys.stdin.readline()
      input_line = input_line.strip()

      if input_line in credentials_list:
        credential_identifier = input_line
      else:
        try:
          credential_identifier = int(input_line, 10)
          credential_identifier = credentials_list[credential_identifier - 1]
        except (IndexError, ValueError):
          output_writer.WriteLine(
              'Unsupported credential: {0:s}'.format(input_line))
          continue

      if credential_identifier == 'skip':
        break

      getpass_string = 'Enter credential data: '
      if sys.platform.startswith('win') and sys.version_info[0] < 3:
        # For Python 2 on Windows getpass (win_getpass) requires an encoded
        # byte string. For Python 3 we need it to be a Unicode string.
        getpass_string = self._EncodeString(getpass_string)

      credential_data = getpass.getpass(getpass_string)
      output_writer.WriteLine('')

      result = self._source_scanner.Unlock(
          scan_context, locked_scan_node.path_spec, credential_identifier,
          credential_data)

      if not result:
        output_writer.WriteLine('Unable to unlock volume.')
        output_writer.WriteLine('')

  def Analyze(self, source_path, output_writer):
    """Analyzes the source.

    Args:
      source_path (str): the source path.
      output_writer (StdoutWriter): the output writer.

    Raises:
      RuntimeError: if the source path does not exists, or if the source path
          is not a file or directory, or if the format of or within the source
          file is not supported.
    """
    if not os.path.exists(source_path):
      raise RuntimeError('No such source: {0:s}.'.format(source_path))

    scan_context = source_scanner.SourceScannerContext()
    scan_path_spec = None
    scan_step = 0

    scan_context.OpenSourcePath(source_path)

    while True:
      self._source_scanner.Scan(
          scan_context, auto_recurse=self._auto_recurse,
          scan_path_spec=scan_path_spec)

      if not scan_context.updated:
        break

      if not self._auto_recurse:
        output_writer.WriteScanContext(scan_context, scan_step=scan_step)
      scan_step += 1

      # The source is a directory or file.
      if scan_context.source_type in [
          definitions.SOURCE_TYPE_DIRECTORY, definitions.SOURCE_TYPE_FILE]:
        break

      # The source scanner found a locked volume, e.g. an encrypted volume,
      # and we need a credential to unlock the volume.
      for locked_scan_node in scan_context.locked_scan_nodes:
        self._PromptUserForEncryptedVolumeCredential(
            scan_context, locked_scan_node, output_writer)

      if not self._auto_recurse:
        scan_node = scan_context.GetUnscannedScanNode()
        if not scan_node:
          return
        scan_path_spec = scan_node.path_spec

    if self._auto_recurse:
      output_writer.WriteScanContext(scan_context)


class StdoutWriter(object):
  """Stdout output writer."""

  def Open(self):
    """Opens the output writer object.

    Returns:
      bool: True if open was successful or False if not.
    """
    return True

  def Close(self):
    """Closes the output writer object."""
    return

  def WriteLine(self, line):
    """Writes a line of text to stdout.

    Args:
      line (str): line of text without a new line indicator.
    """
    print(line)

  def WriteScanContext(self, scan_context, scan_step=None):
    """Writes the source scanner context to stdout.

    Args:
      scan_context (SourceScannerContext): the source scanner context.
      scan_step (Optional[int]): the scan step, where None represents no step.
    """
    if scan_step is not None:
      print('Scan step: {0:d}'.format(scan_step))

    print('Source type\t\t: {0:s}'.format(scan_context.source_type))
    print('')

    scan_node = scan_context.GetRootScanNode()
    self.WriteScanNode(scan_context, scan_node)
    print('')

  def WriteScanNode(self, scan_context, scan_node, indentation=''):
    """Writes the source scanner node to stdout.

    Args:
      scan_context (SourceScannerContext): the source scanner context.
      scan_node (SourceScanNode): the scan node.
      indentation (Optional[str]): indentation.
    """
    if not scan_node:
      return

    values = []

    part_index = getattr(scan_node.path_spec, 'part_index', None)
    if part_index is not None:
      values.append('{0:d}'.format(part_index))

    store_index = getattr(scan_node.path_spec, 'store_index', None)
    if store_index is not None:
      values.append('{0:d}'.format(store_index))

    start_offset = getattr(scan_node.path_spec, 'start_offset', None)
    if start_offset is not None:
      values.append('start offset: {0:d} (0x{0:08x})'.format(start_offset))

    location = getattr(scan_node.path_spec, 'location', None)
    if location is not None:
      values.append('location: {0:s}'.format(location))

    values = ', '.join(values)

    flags = ''
    if scan_node in scan_context.locked_scan_nodes:
      flags = ' [LOCKED]'

    print('{0:s}{1:s}: {2:s}{3:s}'.format(
        indentation, scan_node.path_spec.type_indicator, values, flags))

    indentation = '  {0:s}'.format(indentation)
    for sub_scan_node in scan_node.sub_nodes:
      self.WriteScanNode(scan_context, sub_scan_node, indentation=indentation)

  def WriteString(self, string):
    """Writes a string of text to stdout.

    Args:
      string (str): string of text.
    """
    print(string, end='')


def Main():
  """The main program function.

  Returns:
    bool: True if successful or False if not.
  """
  argument_parser = argparse.ArgumentParser(description=(
      'Calculates a message digest hash for every file in a directory or '
      'storage media image.'))

  argument_parser.add_argument(
      'source', nargs='?', action='store', metavar='image.raw', default=None,
      help=('path of the directory or filename of a storage media image '
            'containing the file.'))

  argument_parser.add_argument(
      '--no-auto-recurse', '--no_auto_recurse', dest='no_auto_recurse',
      action='store_true', default=False, help=(
          'Indicate that the source scanner should not auto-recurse.'))

  options = argument_parser.parse_args()

  if not options.source:
    print('Source value is missing.')
    print('')
    argument_parser.print_help()
    print('')
    return False

  logging.basicConfig(
      level=logging.INFO, format='[%(levelname)s] %(message)s')

  output_writer = StdoutWriter()

  if not output_writer.Open():
    print('Unable to open output writer.')
    print('')
    return False

  return_value = True
  source_analyzer = SourceAnalyzer(auto_recurse=not options.no_auto_recurse)

  try:
    source_analyzer.Analyze(options.source, output_writer)

    print('Completed.')

  except KeyboardInterrupt:
    return_value = False

    print('Aborted by user.')

  output_writer.Close()

  return return_value


if __name__ == '__main__':
  if not Main():
    sys.exit(1)
  else:
    sys.exit(0)