1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
# Run a fuzz test to verify robustness against corrupted/malicious data.
import sys
import time
import zipfile
import random
import subprocess
Import("env", "malloc_env")
def set_pkgname(src, dst, pkgname):
data = open(str(src)).read()
placeholder = '// package name placeholder'
assert placeholder in data
data = data.replace(placeholder, 'package %s;' % pkgname)
open(str(dst), 'w').write(data)
# We want both pointer and static versions of the AllTypes message
# Prefix them with package name.
env.Command("alltypes_static.proto", "#alltypes/alltypes.proto",
lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_static'))
env.Command("alltypes_pointer.proto", "#alltypes/alltypes.proto",
lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_pointer'))
env.NanopbProto(["alltypes_pointer", "alltypes_pointer.options"])
env.NanopbProto(["alltypes_static", "alltypes_static.options"])
# Do the same for proto3 versions
env.Command("alltypes_proto3_static.proto", "#alltypes_proto3/alltypes.proto",
lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_proto3_static'))
env.Command("alltypes_proto3_pointer.proto", "#alltypes_proto3/alltypes.proto",
lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_proto3_pointer'))
env.NanopbProto(["alltypes_proto3_pointer", "alltypes_proto3_pointer.options"])
env.NanopbProto(["alltypes_proto3_static", "alltypes_proto3_static.options"])
# And also a callback version
env.Command("alltypes_callback.proto", "#alltypes/alltypes.proto",
lambda target, source, env: set_pkgname(source[0], target[0], 'alltypes_callback'))
env.NanopbProto(["alltypes_callback", "alltypes_callback.options"])
common_objs = [env.Object("random_data.c"),
env.Object("validation.c"),
env.Object("flakystream.c"),
env.Object("alltypes_pointer.pb.c"),
env.Object("alltypes_static.pb.c"),
env.Object("alltypes_callback.pb.c"),
env.Object("alltypes_proto3_pointer.pb.c"),
env.Object("alltypes_proto3_static.pb.c"),
"$COMMON/malloc_wrappers.o"]
objs_malloc = ["$COMMON/pb_encode_with_malloc.o",
"$COMMON/pb_decode_with_malloc.o",
"$COMMON/pb_common_with_malloc.o"] + common_objs
objs_static = ["$COMMON/pb_encode.o",
"$COMMON/pb_decode.o",
"$COMMON/pb_common.o"] + common_objs
fuzz = malloc_env.Program(["fuzztest.c"] + objs_malloc)
# Run the stand-alone fuzz tester
seed = int(time.time())
if env.get('EMBEDDED'):
iterations = 100
else:
iterations = 1000
env.RunTest(fuzz, ARGS = [str(seed), str(iterations)])
generate_message = malloc_env.Program(["generate_message.c"] + objs_static)
# Test the message generator
env.RunTest(generate_message, ARGS = [str(seed)])
env.RunTest("generate_message.output.fuzzed", [fuzz, "generate_message.output"])
# Run against the latest corpus from ossfuzz
# This allows quick testing against regressions and also lets us more
# completely test slow embedded targets. To reduce runtime, only a subset
# of the corpus is fuzzed each time.
def run_against_corpus(target, source, env):
corpus = zipfile.ZipFile(str(source[1]), 'r')
count = 0
args = [str(source[0])]
if "TEST_RUNNER" in env:
args = [env["TEST_RUNNER"]] + args
if "FUZZTEST_CORPUS_SAMPLESIZE" in env:
samplesize = int(env["FUZZTEST_CORPUS_SAMPLESIZE"])
elif env.get('EMBEDDED'):
samplesize = 100
else:
samplesize = 4096
files = [n for n in corpus.namelist() if not n.endswith('/')]
files = random.sample(files, min(samplesize, len(files)))
for filename in files:
sys.stdout.write("Fuzzing: %5d/%5d: %-40.40s\r" % (count, len(files), filename))
sys.stdout.flush()
count += 1
maxsize = env.get('CPPDEFINES', {}).get('FUZZTEST_BUFSIZE', 256*1024)
data_in = corpus.read(filename)[:maxsize]
try:
process = subprocess.Popen(args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate(input = data_in)
result = process.wait()
except OSError as e:
if e.errno == 22:
print("Warning: OSError 22 when running with input " + filename)
result = process.wait()
else:
raise
if result != 0:
stdout += stderr
print(stdout)
print('\033[31m[FAIL]\033[0m Program ' + str(args) + ' returned ' + str(result) + ' with input ' + filename + ' from ' + str(source[1]))
return result
open(str(target[0]), 'w').write(str(count))
print('\033[32m[ OK ]\033[0m Ran ' + str(args) + " against " + str(source[1]) + " (" + str(count) + " entries)")
env.Command("corpus.zip.fuzzed", [fuzz, "corpus.zip"], run_against_corpus)
env.Command("regressions.zip.fuzzed", [fuzz, "regressions.zip"], run_against_corpus)
# Build separate fuzzers for each test case.
# Having them separate speeds up control flow based fuzzer engines.
# These are mainly used by oss-fuzz project.
env_proto2_static = env.Clone()
env_proto2_static.Append(CPPDEFINES = {'FUZZTEST_PROTO2_STATIC': '1'})
env_proto2_static.Program("fuzztest_proto2_static",
[env_proto2_static.Object("fuzztest_proto2_static.o", "fuzztest.c")] + objs_static)
env_proto2_pointer = malloc_env.Clone()
env_proto2_pointer.Append(CPPDEFINES = {'FUZZTEST_PROTO2_POINTER': '1'})
env_proto2_pointer.Program("fuzztest_proto2_pointer",
[env_proto2_pointer.Object("fuzztest_proto2_pointer.o", "fuzztest.c")] + objs_malloc)
env_proto3_static = env.Clone()
env_proto3_static.Append(CPPDEFINES = {'FUZZTEST_PROTO3_STATIC': '1'})
env_proto3_static.Program("fuzztest_proto3_static",
[env_proto3_static.Object("fuzztest_proto3_static.o", "fuzztest.c")] + objs_static)
env_proto3_pointer = malloc_env.Clone()
env_proto3_pointer.Append(CPPDEFINES = {'FUZZTEST_PROTO3_POINTER': '1'})
env_proto3_pointer.Program("fuzztest_proto3_pointer",
[env_proto3_pointer.Object("fuzztest_proto3_pointer.o", "fuzztest.c")] + objs_malloc)
env_io_errors = malloc_env.Clone()
env_io_errors.Append(CPPDEFINES = {'FUZZTEST_IO_ERRORS': '1'})
env_io_errors.Program("fuzztest_io_errors",
[env_io_errors.Object("fuzztest_io_errors.o", "fuzztest.c")] + objs_malloc)
|