File: decompressor_test.py

package info (click to toggle)
brotli 1.2.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 3,228 kB
  • sloc: ansic: 36,236; python: 817; sh: 97; makefile: 62; cpp: 14
file content (118 lines) | stat: -rw-r--r-- 4,260 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# Copyright 2016 The Brotli Authors. All rights reserved.
#
# Distributed under MIT license.
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT

import functools
import os
import unittest

import brotli

from . import _test_utils


def _get_original_name(test_data):
  return test_data.split('.compressed')[0]


class TestDecompressor(_test_utils.TestCase):

  CHUNK_SIZE = 1
  MIN_OUTPUT_BUFFER_SIZE = 32768  # Actually, several bytes less.

  def setUp(self):
    # super().setUp()  # Requires Py3+
    self.decompressor = brotli.Decompressor()

  def tearDown(self):
    self.decompressor = None
    # super().tearDown()  # Requires Py3+

  def _check_decompression(self, test_data):
    # Verify decompression matches the original.
    temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
    original = _get_original_name(test_data)
    self.assert_files_match(temp_uncompressed, original)

  def _decompress(self, test_data):
    temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
    with open(temp_uncompressed, 'wb') as out_file:
      with open(test_data, 'rb') as in_file:
        read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
        for data in iter(read_chunk, b''):
          out_file.write(self.decompressor.process(data))
    self.assertTrue(self.decompressor.is_finished())

  def _decompress_with_limit(self, test_data):
    output_buffer_limit = 10922
    temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
    with open(temp_uncompressed, 'wb') as out_file:
      with open(test_data, 'rb') as in_file:
        chunk_iter = iter(functools.partial(in_file.read, 10 * 1024), b'')
        while not self.decompressor.is_finished():
          data = b''
          if self.decompressor.can_accept_more_data():
            data = next(chunk_iter, b'')
          decompressed_data = self.decompressor.process(
              data, output_buffer_limit=output_buffer_limit
          )
          self.assertLessEqual(
              len(decompressed_data), self.MIN_OUTPUT_BUFFER_SIZE
          )
          out_file.write(decompressed_data)
        self.assertIsNone(next(chunk_iter, None))

  def _test_decompress(self, test_data):
    self._decompress(test_data)
    self._check_decompression(test_data)

  def _test_decompress_with_limit(self, test_data):
    self._decompress_with_limit(test_data)
    self._check_decompression(test_data)

  def test_too_much_input(self):
    with open(
        os.path.join(_test_utils.TESTDATA_DIR, 'zerosukkanooa.compressed'), 'rb'
    ) as in_file:
      compressed = in_file.read()
      self.decompressor.process(compressed[:-1], output_buffer_limit=10240)
      # the following assertion checks whether the test setup is correct
      self.assertFalse(self.decompressor.can_accept_more_data())
      with self.assertRaises(brotli.error):
        self.decompressor.process(compressed[-1:])

  def test_changing_limit(self):
    test_data = os.path.join(
        _test_utils.TESTDATA_DIR, 'zerosukkanooa.compressed'
    )
    check_output = os.path.exists(test_data.replace('.compressed', ''))
    temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
    with open(temp_uncompressed, 'wb') as out_file:
      with open(test_data, 'rb') as in_file:
        compressed = in_file.read()
        uncompressed = self.decompressor.process(
            compressed[:-1], output_buffer_limit=10240
        )
        self.assertLessEqual(len(uncompressed), self.MIN_OUTPUT_BUFFER_SIZE)
        out_file.write(uncompressed)
        while not self.decompressor.can_accept_more_data():
          out_file.write(self.decompressor.process(b''))
        out_file.write(self.decompressor.process(compressed[-1:]))
    if check_output:
      self._check_decompression(test_data)

  def test_garbage_appended(self):
    with self.assertRaises(brotli.error):
      self.decompressor.process(brotli.compress(b'a') + b'a')

  def test_already_finished(self):
    self.decompressor.process(brotli.compress(b'a'))
    with self.assertRaises(brotli.error):
      self.decompressor.process(b'a')


_test_utils.generate_test_methods(TestDecompressor, for_decompression=True)

if __name__ == '__main__':
  unittest.main()