File: common.py

package info (click to toggle)
python-zstandard 0.23.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,936 kB
  • sloc: ansic: 41,411; python: 8,665; makefile: 22; sh: 14
file content (121 lines) | stat: -rw-r--r-- 3,265 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import io
import os

from typing import List


class NonClosingBytesIO(io.BytesIO):
    """BytesIO that saves the underlying buffer on close().

    This allows us to access written data after close().
    """

    def __init__(self, *args, **kwargs):
        super(NonClosingBytesIO, self).__init__(*args, **kwargs)
        self._saved_buffer = None

    def close(self):
        self._saved_buffer = self.getvalue()
        return super(NonClosingBytesIO, self).close()

    def getvalue(self):
        if self.closed:
            return self._saved_buffer
        else:
            return super(NonClosingBytesIO, self).getvalue()


class CustomBytesIO(io.BytesIO):
    def __init__(self, *args, **kwargs):
        self._flush_count = 0
        self._read_count = 0
        self._write_count = 0
        self.flush_exception = None
        self.read_exception = None
        self.write_exception = None
        super(CustomBytesIO, self).__init__(*args, **kwargs)

    def flush(self):
        self._flush_count += 1

        if self.flush_exception:
            raise self.flush_exception

        return super(CustomBytesIO, self).flush()

    def read(self, *args):
        self._read_count += 1

        if self.read_exception:
            raise self.read_exception

        return super(CustomBytesIO, self).read(*args)

    def write(self, data):
        self._write_count += 1

        if self.write_exception:
            raise self.write_exception

        return super(CustomBytesIO, self).write(data)


_source_files = []  # type: List[bytes]


def random_input_data():
    """Obtain the raw content of source files.

    This is used for generating "random" data to feed into fuzzing, since it is
    faster than random content generation.
    """
    if _source_files:
        return _source_files

    for root, dirs, files in os.walk(os.path.dirname(__file__)):
        # We filter out __pycache__ because there is a race between another
        # process writing cache files and us reading them.
        dirs[:] = list(sorted(d for d in dirs if d != "__pycache__"))

        for f in sorted(files):
            try:
                with open(os.path.join(root, f), "rb") as fh:
                    data = fh.read()
                    # Exclude large files because it can cause us to easily exceed
                    # deadlines during fuzz testing.
                    if data and len(data) < 131072:
                        _source_files.append(data)
            except OSError:
                pass

    # Also add some actual random data.
    _source_files.append(os.urandom(100))
    _source_files.append(os.urandom(1000))
    _source_files.append(os.urandom(10000))
    _source_files.append(os.urandom(100000))
    _source_files.append(os.urandom(1000000))

    return _source_files


def get_optimal_dict_size_heuristically(src):
    return sum(len(ch) for ch in src) // 100


def generate_samples():
    inputs = [
        b"foo" * 32,
        b"bar" * 16,
        b"abcdef" * 64,
        b"sometext" * 128,
        b"baz" * 512,
    ]

    samples = []

    for i in range(128):
        samples.append(inputs[i % 5])
        samples.append(inputs[i % 5] * (i + 3))
        samples.append(inputs[-(i % 5)] * (i + 2))

    return samples