1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
|
import imp
import inspect
import io
import os
import types
import unittest
try:
import hypothesis
except ImportError:
hypothesis = None
class TestCase(unittest.TestCase):
if not getattr(unittest.TestCase, "assertRaisesRegex", False):
assertRaisesRegex = unittest.TestCase.assertRaisesRegexp
def make_cffi(cls):
"""Decorator to add CFFI versions of each test method."""
# The module containing this class definition should
# `import zstandard as zstd`. Otherwise things may blow up.
mod = inspect.getmodule(cls)
if not hasattr(mod, "zstd"):
raise Exception('test module does not contain "zstd" symbol')
if not hasattr(mod.zstd, "backend"):
raise Exception(
'zstd symbol does not have "backend" attribute; did '
"you `import zstandard as zstd`?"
)
# If `import zstandard` already chose the cffi backend, there is nothing
# for us to do: we only add the cffi variation if the default backend
# is the C extension.
if mod.zstd.backend == "cffi":
return cls
old_env = dict(os.environ)
os.environ["PYTHON_ZSTANDARD_IMPORT_POLICY"] = "cffi"
try:
try:
mod_info = imp.find_module("zstandard")
mod = imp.load_module("zstandard_cffi", *mod_info)
except ImportError:
return cls
finally:
os.environ.clear()
os.environ.update(old_env)
if mod.backend != "cffi":
raise Exception(
"got the zstandard %s backend instead of cffi" % mod.backend
)
# If CFFI version is available, dynamically construct test methods
# that use it.
for attr in dir(cls):
fn = getattr(cls, attr)
if not inspect.ismethod(fn) and not inspect.isfunction(fn):
continue
if not fn.__name__.startswith("test_"):
continue
name = "%s_cffi" % fn.__name__
# Replace the "zstd" symbol with the CFFI module instance. Then copy
# the function object and install it in a new attribute.
if isinstance(fn, types.FunctionType):
globs = dict(fn.__globals__)
globs["zstd"] = mod
new_fn = types.FunctionType(
fn.__code__, globs, name, fn.__defaults__, fn.__closure__
)
new_method = new_fn
else:
globs = dict(fn.__func__.func_globals)
globs["zstd"] = mod
new_fn = types.FunctionType(
fn.__func__.func_code,
globs,
name,
fn.__func__.func_defaults,
fn.__func__.func_closure,
)
new_method = types.UnboundMethodType(
new_fn, fn.im_self, fn.im_class
)
setattr(cls, name, new_method)
return cls
class NonClosingBytesIO(io.BytesIO):
"""BytesIO that saves the underlying buffer on close().
This allows us to access written data after close().
"""
def __init__(self, *args, **kwargs):
super(NonClosingBytesIO, self).__init__(*args, **kwargs)
self._saved_buffer = None
def close(self):
self._saved_buffer = self.getvalue()
return super(NonClosingBytesIO, self).close()
def getvalue(self):
if self.closed:
return self._saved_buffer
else:
return super(NonClosingBytesIO, self).getvalue()
class OpCountingBytesIO(NonClosingBytesIO):
def __init__(self, *args, **kwargs):
self._flush_count = 0
self._read_count = 0
self._write_count = 0
return super(OpCountingBytesIO, self).__init__(*args, **kwargs)
def flush(self):
self._flush_count += 1
return super(OpCountingBytesIO, self).flush()
def read(self, *args):
self._read_count += 1
return super(OpCountingBytesIO, self).read(*args)
def write(self, data):
self._write_count += 1
return super(OpCountingBytesIO, self).write(data)
_source_files = []
def random_input_data():
"""Obtain the raw content of source files.
This is used for generating "random" data to feed into fuzzing, since it is
faster than random content generation.
"""
if _source_files:
return _source_files
for root, dirs, files in os.walk(os.path.dirname(__file__)):
dirs[:] = list(sorted(dirs))
for f in sorted(files):
try:
with open(os.path.join(root, f), "rb") as fh:
data = fh.read()
if data:
_source_files.append(data)
except OSError:
pass
# Also add some actual random data.
_source_files.append(os.urandom(100))
_source_files.append(os.urandom(1000))
_source_files.append(os.urandom(10000))
_source_files.append(os.urandom(100000))
_source_files.append(os.urandom(1000000))
return _source_files
def generate_samples():
inputs = [
b"foo",
b"bar",
b"abcdef",
b"sometext",
b"baz",
]
samples = []
for i in range(128):
samples.append(inputs[i % 5])
samples.append(inputs[i % 5] * (i + 3))
samples.append(inputs[-(i % 5)] * (i + 2))
return samples
if hypothesis:
default_settings = hypothesis.settings(deadline=10000)
hypothesis.settings.register_profile("default", default_settings)
ci_settings = hypothesis.settings(deadline=20000, max_examples=1000)
hypothesis.settings.register_profile("ci", ci_settings)
expensive_settings = hypothesis.settings(deadline=None, max_examples=10000)
hypothesis.settings.register_profile("expensive", expensive_settings)
hypothesis.settings.load_profile(
os.environ.get("HYPOTHESIS_PROFILE", "default")
)
|