1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
|
from rpython.rtyper.lltypesystem import rffi, lltype
from rpython.rlib import rgc # Force registration of gc.collect
from rpython.rlib.buffer import RawBuffer
from rpython.rlib.rarithmetic import widen
from pypy.interpreter.error import oefmt
from pypy.interpreter.buffer import BufferView
from pypy.module.cpyext.api import (
cpython_api, Py_buffer, Py_ssize_t, Py_ssize_tP, CONST_STRINGP, cts,
generic_cpy_call,
PyBUF_WRITABLE, PyBUF_FORMAT, PyBUF_ND, PyBUF_STRIDES, PyBUF_SIMPLE)
from pypy.module.cpyext.typeobjectdefs import releasebufferproc
from pypy.module.cpyext.pyobject import PyObject, incref, decref, as_pyobj
class CBuffer(RawBuffer):
_immutable_ = True
def __init__(self, view):
self.view = view
self.readonly = view.readonly
def getlength(self):
return self.view.getlength()
def getitem(self, index):
return self.view.ptr[index]
def getslice(self, start, step, size):
assert step == 1
ptr = rffi.ptradd(cts.cast('char *', self.view.ptr), start)
return rffi.charpsize2str(ptr, size)
def setitem(self, index, char):
self.view.ptr[index] = char
def setslice(self, index, s):
assert s is not None
ptr = rffi.ptradd(cts.cast('char *', self.view.ptr), index)
rffi.str2chararray(s, ptr, len(s))
def get_raw_address(self):
return cts.cast('char *', self.view.ptr)
class CPyBuffer(BufferView):
# Similar to Py_buffer
_immutable_ = True
def __init__(self, space, ptr, size, w_obj, format='B', shape=None,
strides=None, ndim=1, itemsize=1, readonly=True,
needs_decref=False,
releasebufferproc=rffi.cast(releasebufferproc, 0)):
self.space = space
self.ptr = ptr
self.size = size
self.w_obj = w_obj # kept alive
self.pyobj = as_pyobj(space, w_obj)
self.format = format
self.ndim = ndim
self.itemsize = itemsize
# cf. Objects/memoryobject.c:init_shape_strides()
if ndim == 0:
self.shape = []
self.strides = []
elif ndim == 1:
if shape is None:
self.shape = [size // itemsize]
else:
self.shape = shape
if strides is None:
self.strides = [itemsize]
else:
self.strides = strides
else:
assert len(shape) == ndim
self.shape = shape
# XXX: missing init_strides_from_shape
self.strides = strides
self.readonly = readonly
self.needs_decref = needs_decref
self.releasebufferproc = releasebufferproc
def releasebuffer(self):
if self.pyobj:
if self.needs_decref:
if self.releasebufferproc:
func_target = rffi.cast(releasebufferproc, self.releasebufferproc)
size = rffi.sizeof(cts.gettype('Py_buffer'))
pybuf = lltype.malloc(rffi.VOIDP.TO, size, flavor='raw', zero=True)
pybuf = cts.cast('Py_buffer*', pybuf)
pybuf.c_buf = self.ptr
pybuf.c_len = self.size
pybuf.c_ndim = cts.cast('int', self.ndim)
pybuf.c_shape = cts.cast('Py_ssize_t*', pybuf.c__shape)
pybuf.c_strides = cts.cast('Py_ssize_t*', pybuf.c__strides)
pybuf.c_obj = self.pyobj
for i in range(self.ndim):
pybuf.c_shape[i] = self.shape[i]
pybuf.c_strides[i] = self.strides[i]
fmt = rffi.str2charp(self.format if self.format else "B")
try:
pybuf.c_format = fmt
generic_cpy_call(self.space, func_target, self.pyobj, pybuf)
finally:
lltype.free(fmt, flavor='raw')
lltype.free(pybuf, flavor='raw')
decref(self.space, self.pyobj)
else:
decref(self.space, self.pyobj)
self.pyobj = lltype.nullptr(PyObject.TO)
self.w_obj = None
else:
#do not call twice
return
def getlength(self):
return self.size
def getbytes(self, start, size):
return ''.join([self.ptr[i] for i in range(start, start + size)])
def setbytes(self, start, string):
# absolutely no safety checks, what could go wrong?
for i in range(len(string)):
self.ptr[start + i] = string[i]
def as_str(self):
return CBuffer(self).as_str()
def as_readbuf(self):
return CBuffer(self)
def as_writebuf(self):
assert not self.readonly
return CBuffer(self)
def get_raw_address(self):
return rffi.cast(rffi.CCHARP, self.ptr)
def getformat(self):
return self.format
def getshape(self):
return self.shape
def getstrides(self):
return self.strides
def getitemsize(self):
return self.itemsize
def getndim(self):
return self.ndim
class FQ(rgc.FinalizerQueue):
Class = CPyBuffer
def finalizer_trigger(self):
while 1:
buf = self.next_dead()
if not buf:
break
if buf.releasebuffer:
buf.releasebuffer()
fq = FQ()
@cpython_api([PyObject, CONST_STRINGP, Py_ssize_tP], rffi.INT_real, error=-1)
def PyObject_AsCharBuffer(space, obj, bufferp, sizep):
"""Returns a pointer to a read-only memory location usable as
character-based input. The obj argument must support the single-segment
character buffer interface. On success, returns 0, sets buffer to the
memory location and size to the buffer length. Returns -1 and sets a
TypeError on error.
"""
pto = obj.c_ob_type
pb = pto.c_tp_as_buffer
if not (pb and pb.c_bf_getbuffer):
raise oefmt(space.w_TypeError,
"expected an object with the buffer interface")
with lltype.scoped_alloc(Py_buffer) as view:
ret = generic_cpy_call(
space, pb.c_bf_getbuffer,
obj, view, rffi.cast(rffi.INT_real, PyBUF_SIMPLE))
if rffi.cast(lltype.Signed, ret) == -1:
return -1
bufferp[0] = rffi.cast(rffi.CCHARP, view.c_buf)
sizep[0] = view.c_len
try:
if pb.c_bf_releasebuffer:
generic_cpy_call(space, pb.c_bf_releasebuffer,
obj, view)
finally:
decref(space, view.c_obj)
return 0
DEFAULT_FMT = rffi.str2charp("B")
@cpython_api([lltype.Ptr(Py_buffer), PyObject, rffi.VOIDP, Py_ssize_t,
rffi.INT_real, rffi.INT_real], rffi.INT, error=-1)
def PyBuffer_FillInfo(space, view, obj, buf, length, readonly, flags):
"""
Fills in a buffer-info structure correctly for an exporter that can only
share a contiguous chunk of memory of "unsigned bytes" of the given
length. Returns 0 on success and -1 (with raising an error) on error.
"""
readonly = widen(readonly)
flags = widen(flags)
if flags & PyBUF_WRITABLE and readonly:
raise oefmt(space.w_ValueError, "Object is not writable")
view.c_buf = buf
view.c_len = length
view.c_obj = obj
if obj:
incref(space, obj)
view.c_itemsize = 1
rffi.setintfield(view, 'c_readonly', readonly)
rffi.setintfield(view, 'c_ndim', 1)
view.c_format = lltype.nullptr(rffi.CCHARP.TO)
if (flags & PyBUF_FORMAT) == PyBUF_FORMAT:
# NB: this needs to be a static string, because nothing frees it
view.c_format = DEFAULT_FMT
view.c_shape = lltype.nullptr(Py_ssize_tP.TO)
if (flags & PyBUF_ND) == PyBUF_ND:
view.c_shape = rffi.cast(Py_ssize_tP, view.c__shape)
view.c_shape[0] = view.c_len
view.c_strides = lltype.nullptr(Py_ssize_tP.TO)
if (flags & PyBUF_STRIDES) == PyBUF_STRIDES:
view.c_strides = rffi.cast(Py_ssize_tP, view.c__strides)
view.c_strides[0] = view.c_itemsize
view.c_suboffsets = lltype.nullptr(Py_ssize_tP.TO)
view.c_internal = lltype.nullptr(rffi.VOIDP.TO)
return 0
|