1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
|
import pytest, sys
try:
# comment out the following line to run this test.
# the latest on x86-64 linux: https://github.com/libffi/libffi/issues/574
if sys.platform != 'win32':
raise ImportError("this test is skipped because it keeps finding "
"failures in libffi, instead of cffi")
from hypothesis import given, settings, example
from hypothesis import strategies as st
except ImportError as e:
e1 = e
def test_types():
pytest.skip(str(e1))
else:
from cffi import FFI
import sys, random
from .test_recompiler import verify
ALL_PRIMITIVES = [
'unsigned char',
'short',
'int',
'long',
'long long',
'float',
'double',
#'long double', --- on x86 it can give libffi crashes
]
def _make_struct(s):
return st.lists(s, min_size=1)
types = st.one_of(st.sampled_from(ALL_PRIMITIVES),
st.lists(st.sampled_from(ALL_PRIMITIVES), min_size=1))
# NB. 'types' could be st.recursive instead, but it doesn't
# really seem useful
def draw_primitive(ffi, typename):
value = random.random() * 2**40
if typename != 'long double':
return ffi.cast(typename, value)
else:
return value
TEST_RUN_COUNTER = 0
@given(st.lists(types), types)
@settings(max_examples=100, deadline=5000) # 5000ms
def test_types(tp_args, tp_result):
global TEST_RUN_COUNTER
print(tp_args, tp_result)
cdefs = []
structs = {}
def build_type(tp):
if type(tp) is list:
field_types = [build_type(tp1) for tp1 in tp]
fields = ['%s f%d;' % (ftp, j)
for (j, ftp) in enumerate(field_types)]
fields = '\n '.join(fields)
name = 's%d' % len(cdefs)
cdefs.append("typedef struct {\n %s\n} %s;" % (fields, name))
structs[name] = field_types
return name
else:
return tp
args = [build_type(tp) for tp in tp_args]
result = build_type(tp_result)
TEST_RUN_COUNTER += 1
signature = "%s testfargs(%s)" % (result,
', '.join(['%s a%d' % (arg, i) for (i, arg) in enumerate(args)])
or 'void')
source = list(cdefs)
cdefs.append("%s;" % signature)
cdefs.append("extern %s testfargs_result;" % result)
for i, arg in enumerate(args):
cdefs.append("extern %s testfargs_arg%d;" % (arg, i))
source.append("%s testfargs_result;" % result)
for i, arg in enumerate(args):
source.append("%s testfargs_arg%d;" % (arg, i))
source.append(signature)
source.append("{")
for i, arg in enumerate(args):
source.append(" testfargs_arg%d = a%d;" % (i, i))
source.append(" return testfargs_result;")
source.append("}")
typedef_line = "typedef %s;" % (signature.replace('testfargs',
'(*mycallback_t)'),)
assert signature.endswith(')')
sig_callback = "%s testfcallback(mycallback_t callback)" % result
cdefs.append(typedef_line)
cdefs.append("%s;" % sig_callback)
source.append(typedef_line)
source.append(sig_callback)
source.append("{")
source.append(" return callback(%s);" %
', '.join(["testfargs_arg%d" % i for i in range(len(args))]))
source.append("}")
ffi = FFI()
ffi.cdef("\n".join(cdefs))
lib = verify(ffi, 'test_function_args_%d' % TEST_RUN_COUNTER,
"\n".join(source), no_cpp=True)
# when getting segfaults, enable this:
if False:
from testing.udir import udir
import subprocess
f = open(str(udir.join('run1.py')), 'w')
f.write('import sys; sys.path = %r\n' % (sys.path,))
f.write('from _CFFI_test_function_args_%d import ffi, lib\n' %
TEST_RUN_COUNTER)
for i in range(len(args)):
f.write('a%d = ffi.new("%s *")\n' % (i, args[i]))
aliststr = ', '.join(['a%d[0]' % i for i in range(len(args))])
f.write('lib.testfargs(%s)\n' % aliststr)
f.write('ffi.addressof(lib, "testfargs")(%s)\n' % aliststr)
f.close()
print("checking for segfault for direct call...")
rc = subprocess.call([sys.executable, 'run1.py'], cwd=str(udir))
assert rc == 0, rc
def make_arg(tp):
if tp in structs:
return [make_arg(tp1) for tp1 in structs[tp]]
else:
return draw_primitive(ffi, tp)
passed_args = [make_arg(arg) for arg in args]
returned_value = make_arg(result)
def write(p, v):
if type(v) is list:
for i, v1 in enumerate(v):
write(ffi.addressof(p, 'f%d' % i), v1)
else:
p[0] = v
write(ffi.addressof(lib, 'testfargs_result'), returned_value)
## CALL forcing libffi
print("CALL forcing libffi")
received_return = ffi.addressof(lib, 'testfargs')(*passed_args)
##
_tp_long_double = ffi.typeof("long double")
def check(p, v):
if type(v) is list:
for i, v1 in enumerate(v):
check(ffi.addressof(p, 'f%d' % i), v1)
else:
if ffi.typeof(p).item is _tp_long_double:
assert ffi.cast("double", p[0]) == v
else:
assert p[0] == v
for i, arg in enumerate(passed_args):
check(ffi.addressof(lib, 'testfargs_arg%d' % i), arg)
ret = ffi.new(result + "*", received_return)
check(ret, returned_value)
## CALLBACK
def expand(value):
if isinstance(value, ffi.CData):
t = ffi.typeof(value)
if t is _tp_long_double:
return float(ffi.cast("double", value))
return [expand(getattr(value, 'f%d' % i))
for i in range(len(t.fields))]
else:
return value
# when getting segfaults, enable this:
if False:
from testing.udir import udir
import subprocess
f = open(str(udir.join('run1.py')), 'w')
f.write('import sys; sys.path = %r\n' % (sys.path,))
f.write('from _CFFI_test_function_args_%d import ffi, lib\n' %
TEST_RUN_COUNTER)
f.write('def callback(*args): return ffi.new("%s *")[0]\n' % result)
f.write('fptr = ffi.callback("%s(%s)", callback)\n' % (result,
','.join(args)))
f.write('print(lib.testfcallback(fptr))\n')
f.close()
print("checking for segfault for callback...")
rc = subprocess.call([sys.executable, 'run1.py'], cwd=str(udir))
assert rc == 0, rc
seen_args = []
def callback(*args):
seen_args.append([expand(arg) for arg in args])
return returned_value
fptr = ffi.callback("%s(%s)" % (result, ','.join(args)), callback)
print("CALL with callback")
received_return = lib.testfcallback(fptr)
assert len(seen_args) == 1
assert passed_args == seen_args[0]
ret = ffi.new(result + "*", received_return)
check(ret, returned_value)
|