1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
|
import _rawffi
from _rawffi import alt as _ffi
import sys
try: from __pypy__ import builtinify
except ImportError: builtinify = lambda f: f
keepalive_key = str # XXX fix this when provided with test
def ensure_objects(where):
try:
ensure = where._ensure_objects
except AttributeError:
return None
return ensure()
def store_reference(where, base_key, target):
if '_index' not in where.__dict__:
# shortcut
where._ensure_objects()[str(base_key)] = target
return
key = [base_key]
while '_index' in where.__dict__:
key.append(where.__dict__['_index'])
where = where.__dict__['_base']
real_key = ":".join([str(i) for i in key])
where._ensure_objects()[real_key] = target
class ArgumentError(Exception):
pass
class COMError(Exception):
"Raised when a COM method call failed."
def __init__(self, hresult, text, details):
self.args = (hresult, text, details)
self.hresult = hresult
self.text = text
self.details = details
class _CDataMeta(type):
def from_param(self, value):
if isinstance(value, self):
return value
try:
as_parameter = value._as_parameter_
except AttributeError:
raise TypeError("expected %s instance instead of %s" % (
self.__name__, type(value).__name__))
else:
return self.from_param(as_parameter)
def _build_ffiargtype(self):
return _shape_to_ffi_type(self._ffiargshape_)
def get_ffi_argtype(self):
if self._ffiargtype:
return self._ffiargtype
self._ffiargtype = self._build_ffiargtype()
return self._ffiargtype
def _CData_output(self, resbuffer, base=None, index=-1):
#assert isinstance(resbuffer, _rawffi.ArrayInstance)
"""Used when data exits ctypes and goes into user code.
'resbuffer' is a _rawffi array of length 1 containing the value,
and this returns a general Python object that corresponds.
"""
res = object.__new__(self)
res.__class__ = self
res.__dict__['_buffer'] = resbuffer
if base is not None:
res.__dict__['_base'] = base
res.__dict__['_index'] = index
return res
def _CData_retval(self, resbuffer):
return self._CData_output(resbuffer)
def __mul__(self, other):
from _ctypes.array import create_array_type
return create_array_type(self, other)
__rmul__ = __mul__
def _is_pointer_like(self):
return False
def in_dll(self, dll, name):
return self.from_address(dll.__pypy_dll__.getaddressindll(name))
def from_buffer(self, obj, offset=0):
size = self._sizeofinstances()
if isinstance(obj, (str, unicode)):
# hack, buffer(str) will always return a readonly buffer.
# CPython calls PyObject_AsWriteBuffer(...) here!
# str cannot be modified, thus raise a type error in this case
raise TypeError("Cannot use %s as modifiable buffer" % str(type(obj)))
# why not just call memoryview(obj)[offset:]?
# array in Python 2.7 does not support the buffer protocol and will
# fail, even though buffer is supported
buf = buffer(obj, offset, size)
if len(buf) < size:
raise ValueError(
"Buffer size too small (%d instead of at least %d bytes)"
% (len(buf) + offset, size + offset))
raw_addr = buf._pypy_raw_address()
result = self.from_address(raw_addr)
objects = result._ensure_objects()
if objects is not None:
objects['ffffffff'] = obj
else: # case e.g. of a primitive type like c_int
result._objects = obj
return result
def from_buffer_copy(self, obj, offset=0):
size = self._sizeofinstances()
buf = buffer(obj, offset, size)
if len(buf) < size:
raise ValueError(
"Buffer size too small (%d instead of at least %d bytes)"
% (len(buf) + offset, size + offset))
result = self._newowninstance_()
dest = result._buffer.buffer
try:
raw_addr = buf._pypy_raw_address()
except ValueError:
_rawffi.rawstring2charp(dest, buf)
else:
from ctypes import memmove
memmove(dest, raw_addr, size)
return result
def _newowninstance_(self):
result = self.__new__(self)
result._init_no_arg_()
return result
class CArgObject(object):
""" simple wrapper around buffer, just for the case of freeing
it afterwards
"""
def __init__(self, obj, buffer):
self._obj = obj
self._buffer = buffer
def __del__(self):
self._buffer.free()
self._buffer = None
def __repr__(self):
return '<CArgObject %r>' % (self._obj,)
def __eq__(self, other):
return self._obj == other
def __ne__(self, other):
return self._obj != other
class _CData(object):
""" The most basic object for all ctypes types
"""
__metaclass__ = _CDataMeta
_objects = None
_ffiargtype = None
def __init__(self, *args, **kwds):
raise TypeError("%s has no type" % (type(self),))
_init_no_arg_ = __init__
def _ensure_objects(self):
if '_objects' not in self.__dict__:
if '_index' in self.__dict__:
return None
self.__dict__['_objects'] = {}
return self._objects
def __ctypes_from_outparam__(self):
return self
def _get_buffer_for_param(self):
return self
def _get_buffer_value(self):
return self._buffer[0]
def _copy_to(self, addr):
target = type(self).from_address(addr)._buffer
target[0] = self._get_buffer_value()
def _to_ffi_param(self):
if self.__class__._is_pointer_like():
return self._get_buffer_value()
else:
return self.value
def __buffer__(self, flags):
return buffer(self._buffer)
def _get_b_base(self):
try:
return self._base
except AttributeError:
return None
_b_base_ = property(_get_b_base)
_b_needsfree_ = False
@builtinify
def sizeof(tp):
if not isinstance(tp, _CDataMeta):
if isinstance(tp, _CData):
tp = type(tp)
else:
raise TypeError("ctypes type or instance expected, got %r" % (
type(tp).__name__,))
return tp._sizeofinstances()
@builtinify
def alignment(tp):
if not isinstance(tp, _CDataMeta):
if isinstance(tp, _CData):
tp = type(tp)
else:
raise TypeError("ctypes type or instance expected, got %r" % (
type(tp).__name__,))
return tp._alignmentofinstances()
@builtinify
def byref(cdata, offset=0):
# "pointer" is imported at the end of this module to avoid circular
# imports
ptr = pointer(cdata)
if offset != 0:
ptr._buffer[0] += offset
return ptr
def cdata_from_address(self, address):
# fix the address: turn it into as unsigned, in case it's a negative number
address = address & (sys.maxint * 2 + 1)
instance = self.__new__(self)
lgt = getattr(self, '_length_', 1)
instance._buffer = self._ffiarray.fromaddress(address, lgt)
return instance
@builtinify
def addressof(tp):
return tp._buffer.buffer
# ----------------------------------------------------------------------
def is_struct_shape(shape):
# see the corresponding code to set the shape in
# _ctypes.structure._set_shape
return (isinstance(shape, tuple) and
len(shape) == 2 and
isinstance(shape[0], _rawffi.Structure) and
shape[1] == 1)
def _shape_to_ffi_type(shape):
try:
return _shape_to_ffi_type.typemap[shape]
except KeyError:
pass
if is_struct_shape(shape):
return shape[0].get_ffi_type()
#
assert False, 'unknown shape %s' % (shape,)
_shape_to_ffi_type.typemap = {
'c' : _ffi.types.char,
'b' : _ffi.types.sbyte,
'B' : _ffi.types.ubyte,
'h' : _ffi.types.sshort,
'u' : _ffi.types.unichar,
'H' : _ffi.types.ushort,
'i' : _ffi.types.sint,
'I' : _ffi.types.uint,
'l' : _ffi.types.slong,
'L' : _ffi.types.ulong,
'q' : _ffi.types.slonglong,
'Q' : _ffi.types.ulonglong,
'f' : _ffi.types.float,
'd' : _ffi.types.double,
's' : _ffi.types.void_p,
'P' : _ffi.types.void_p,
'z' : _ffi.types.void_p,
'O' : _ffi.types.void_p,
'Z' : _ffi.types.void_p,
'X' : _ffi.types.void_p,
'v' : _ffi.types.sshort,
'?' : _ffi.types.ubyte,
}
# called from primitive.py, pointer.py, array.py
def as_ffi_pointer(value, ffitype):
my_ffitype = type(value).get_ffi_argtype()
# for now, we always allow types.pointer, else a lot of tests
# break. We need to rethink how pointers are represented, though
if my_ffitype is not ffitype and ffitype is not _ffi.types.void_p:
raise ArgumentError("expected %s instance, got %s" % (type(value),
ffitype))
return value._get_buffer_value()
# used by "byref"
from _ctypes.pointer import pointer
|