1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
import io
import os
from typing import List
class NonClosingBytesIO(io.BytesIO):
"""BytesIO that saves the underlying buffer on close().
This allows us to access written data after close().
"""
def __init__(self, *args, **kwargs):
super(NonClosingBytesIO, self).__init__(*args, **kwargs)
self._saved_buffer = None
def close(self):
self._saved_buffer = self.getvalue()
return super(NonClosingBytesIO, self).close()
def getvalue(self):
if self.closed:
return self._saved_buffer
else:
return super(NonClosingBytesIO, self).getvalue()
class CustomBytesIO(io.BytesIO):
def __init__(self, *args, **kwargs):
self._flush_count = 0
self._read_count = 0
self._write_count = 0
self.flush_exception = None
self.read_exception = None
self.write_exception = None
super(CustomBytesIO, self).__init__(*args, **kwargs)
def flush(self):
self._flush_count += 1
if self.flush_exception:
raise self.flush_exception
return super(CustomBytesIO, self).flush()
def read(self, *args):
self._read_count += 1
if self.read_exception:
raise self.read_exception
return super(CustomBytesIO, self).read(*args)
def write(self, data):
self._write_count += 1
if self.write_exception:
raise self.write_exception
return super(CustomBytesIO, self).write(data)
_source_files = [] # type: List[bytes]
def random_input_data():
"""Obtain the raw content of source files.
This is used for generating "random" data to feed into fuzzing, since it is
faster than random content generation.
"""
if _source_files:
return _source_files
for root, dirs, files in os.walk(os.path.dirname(__file__)):
# We filter out __pycache__ because there is a race between another
# process writing cache files and us reading them.
dirs[:] = list(sorted(d for d in dirs if d != "__pycache__"))
for f in sorted(files):
try:
with open(os.path.join(root, f), "rb") as fh:
data = fh.read()
# Exclude large files because it can cause us to easily exceed
# deadlines during fuzz testing.
if data and len(data) < 131072:
_source_files.append(data)
except OSError:
pass
# Also add some actual random data.
_source_files.append(os.urandom(100))
_source_files.append(os.urandom(1000))
_source_files.append(os.urandom(10000))
_source_files.append(os.urandom(100000))
_source_files.append(os.urandom(1000000))
return _source_files
def get_optimal_dict_size_heuristically(src):
return sum(len(ch) for ch in src) // 100
def generate_samples():
inputs = [
b"foo" * 32,
b"bar" * 16,
b"abcdef" * 64,
b"sometext" * 128,
b"baz" * 512,
]
samples = []
for i in range(128):
samples.append(inputs[i % 5])
samples.append(inputs[i % 5] * (i + 3))
samples.append(inputs[-(i % 5)] * (i + 2))
return samples
|