File: basics.py

package info (click to toggle)
pypy 7.0.0%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 107,216 kB
  • sloc: python: 1,201,787; ansic: 62,419; asm: 5,169; cpp: 3,017; sh: 2,534; makefile: 545; xml: 243; lisp: 45; awk: 4
file content (310 lines) | stat: -rw-r--r-- 9,367 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
import _rawffi
from _rawffi import alt as _ffi
import sys

try: from __pypy__ import builtinify
except ImportError: builtinify = lambda f: f

keepalive_key = str # XXX fix this when provided with test

def ensure_objects(where):
    try:
        ensure = where._ensure_objects
    except AttributeError:
        return None
    return ensure()

def store_reference(where, base_key, target):
    if '_index' not in where.__dict__:
        # shortcut
        where._ensure_objects()[str(base_key)] = target
        return
    key = [base_key]
    while '_index' in where.__dict__:
        key.append(where.__dict__['_index'])
        where = where.__dict__['_base']
    real_key = ":".join([str(i) for i in key])
    where._ensure_objects()[real_key] = target

class ArgumentError(Exception):
    pass

class COMError(Exception):
    "Raised when a COM method call failed."
    def __init__(self, hresult, text, details):
        self.args = (hresult, text, details)
        self.hresult = hresult
        self.text = text
        self.details = details

class _CDataMeta(type):
    def from_param(self, value):
        if isinstance(value, self):
            return value
        try:
            as_parameter = value._as_parameter_
        except AttributeError:
            raise TypeError("expected %s instance instead of %s" % (
                self.__name__, type(value).__name__))
        else:
            return self.from_param(as_parameter)

    def _build_ffiargtype(self):
        return _shape_to_ffi_type(self._ffiargshape_)

    def get_ffi_argtype(self):
        if self._ffiargtype:
            return self._ffiargtype
        self._ffiargtype = self._build_ffiargtype()
        return self._ffiargtype

    def _CData_output(self, resbuffer, base=None, index=-1):
        #assert isinstance(resbuffer, _rawffi.ArrayInstance)
        """Used when data exits ctypes and goes into user code.
        'resbuffer' is a _rawffi array of length 1 containing the value,
        and this returns a general Python object that corresponds.
        """
        res = object.__new__(self)
        res.__class__ = self
        res.__dict__['_buffer'] = resbuffer
        if base is not None:
            res.__dict__['_base'] = base
            res.__dict__['_index'] = index
        return res

    def _CData_retval(self, resbuffer):
        return self._CData_output(resbuffer)

    def __mul__(self, other):
        from _ctypes.array import create_array_type
        return create_array_type(self, other)

    __rmul__ = __mul__

    def _is_pointer_like(self):
        return False

    def in_dll(self, dll, name):
        return self.from_address(dll.__pypy_dll__.getaddressindll(name))

    def from_buffer(self, obj, offset=0):
        size = self._sizeofinstances()
        if isinstance(obj, (str, unicode)):
            # hack, buffer(str) will always return a readonly buffer.
            # CPython calls PyObject_AsWriteBuffer(...) here!
            # str cannot be modified, thus raise a type error in this case
            raise TypeError("Cannot use %s as modifiable buffer" % str(type(obj)))

        # why not just call memoryview(obj)[offset:]?
        # array in Python 2.7 does not support the buffer protocol and will
        # fail, even though buffer is supported
        buf = buffer(obj, offset, size)

        if len(buf) < size:
            raise ValueError(
                "Buffer size too small (%d instead of at least %d bytes)"
                % (len(buf) + offset, size + offset))
        raw_addr = buf._pypy_raw_address()
        result = self.from_address(raw_addr)
        objects = result._ensure_objects()
        if objects is not None:
            objects['ffffffff'] = obj
        else:   # case e.g. of a primitive type like c_int
            result._objects = obj
        return result

    def from_buffer_copy(self, obj, offset=0):
        size = self._sizeofinstances()
        buf = buffer(obj, offset, size)
        if len(buf) < size:
            raise ValueError(
                "Buffer size too small (%d instead of at least %d bytes)"
                % (len(buf) + offset, size + offset))
        result = self._newowninstance_()
        dest = result._buffer.buffer
        try:
            raw_addr = buf._pypy_raw_address()
        except ValueError:
            _rawffi.rawstring2charp(dest, buf)
        else:
            from ctypes import memmove
            memmove(dest, raw_addr, size)
        return result

    def _newowninstance_(self):
        result = self.__new__(self)
        result._init_no_arg_()
        return result


class CArgObject(object):
    """ simple wrapper around buffer, just for the case of freeing
    it afterwards
    """
    def __init__(self, obj, buffer):
        self._obj = obj
        self._buffer = buffer

    def __del__(self):
        self._buffer.free()
        self._buffer = None

    def __repr__(self):
        return '<CArgObject %r>' % (self._obj,)

    def __eq__(self, other):
        return self._obj == other

    def __ne__(self, other):
        return self._obj != other

class _CData(object):
    """ The most basic object for all ctypes types
    """
    __metaclass__ = _CDataMeta
    _objects = None
    _ffiargtype = None

    def __init__(self, *args, **kwds):
        raise TypeError("%s has no type" % (type(self),))
    _init_no_arg_ = __init__

    def _ensure_objects(self):
        if '_objects' not in self.__dict__:
            if '_index' in self.__dict__:
                return None
            self.__dict__['_objects'] = {}
        return self._objects

    def __ctypes_from_outparam__(self):
        return self

    def _get_buffer_for_param(self):
        return self

    def _get_buffer_value(self):
        return self._buffer[0]

    def _copy_to(self, addr):
        target = type(self).from_address(addr)._buffer
        target[0] = self._get_buffer_value()

    def _to_ffi_param(self):
        if self.__class__._is_pointer_like():
            return self._get_buffer_value()
        else:
            return self.value

    def __buffer__(self, flags):
        return buffer(self._buffer)

    def _get_b_base(self):
        try:
            return self._base
        except AttributeError:
            return None
    _b_base_ = property(_get_b_base)
    _b_needsfree_ = False

@builtinify
def sizeof(tp):
    if not isinstance(tp, _CDataMeta):
        if isinstance(tp, _CData):
            tp = type(tp)
        else:
            raise TypeError("ctypes type or instance expected, got %r" % (
                type(tp).__name__,))
    return tp._sizeofinstances()

@builtinify
def alignment(tp):
    if not isinstance(tp, _CDataMeta):
        if isinstance(tp, _CData):
            tp = type(tp)
        else:
            raise TypeError("ctypes type or instance expected, got %r" % (
                type(tp).__name__,))
    return tp._alignmentofinstances()

@builtinify
def byref(cdata, offset=0):
    # "pointer" is imported at the end of this module to avoid circular
    # imports
    ptr = pointer(cdata)
    if offset != 0:
        ptr._buffer[0] += offset
    return ptr

def cdata_from_address(self, address):
    # fix the address: turn it into as unsigned, in case it's a negative number
    address = address & (sys.maxint * 2 + 1)
    instance = self.__new__(self)
    lgt = getattr(self, '_length_', 1)
    instance._buffer = self._ffiarray.fromaddress(address, lgt)
    return instance

@builtinify
def addressof(tp):
    return tp._buffer.buffer


# ----------------------------------------------------------------------

def is_struct_shape(shape):
    # see the corresponding code to set the shape in
    # _ctypes.structure._set_shape
    return (isinstance(shape, tuple) and
            len(shape) == 2 and
            isinstance(shape[0], _rawffi.Structure) and
            shape[1] == 1)

def _shape_to_ffi_type(shape):
    try:
        return _shape_to_ffi_type.typemap[shape]
    except KeyError:
        pass
    if is_struct_shape(shape):
        return shape[0].get_ffi_type()
    #
    assert False, 'unknown shape %s' % (shape,)


_shape_to_ffi_type.typemap =  {
    'c' : _ffi.types.char,
    'b' : _ffi.types.sbyte,
    'B' : _ffi.types.ubyte,
    'h' : _ffi.types.sshort,
    'u' : _ffi.types.unichar,
    'H' : _ffi.types.ushort,
    'i' : _ffi.types.sint,
    'I' : _ffi.types.uint,
    'l' : _ffi.types.slong,
    'L' : _ffi.types.ulong,
    'q' : _ffi.types.slonglong,
    'Q' : _ffi.types.ulonglong,
    'f' : _ffi.types.float,
    'd' : _ffi.types.double,
    's' : _ffi.types.void_p,
    'P' : _ffi.types.void_p,
    'z' : _ffi.types.void_p,
    'O' : _ffi.types.void_p,
    'Z' : _ffi.types.void_p,
    'X' : _ffi.types.void_p,
    'v' : _ffi.types.sshort,
    '?' : _ffi.types.ubyte,
    }


# called from primitive.py, pointer.py, array.py
def as_ffi_pointer(value, ffitype):
    my_ffitype = type(value).get_ffi_argtype()
    # for now, we always allow types.pointer, else a lot of tests
    # break. We need to rethink how pointers are represented, though
    if my_ffitype is not ffitype and ffitype is not _ffi.types.void_p:
        raise ArgumentError("expected %s instance, got %s" % (type(value),
                                                              ffitype))
    return value._get_buffer_value()


# used by "byref"
from _ctypes.pointer import pointer