File: buffer.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (106 lines) | stat: -rw-r--r-- 3,486 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from rpython.rtyper.lltypesystem import lltype
from rpython.rlib.buffer import LLBuffer
from rpython.rlib.rgc import FinalizerQueue

from pypy.interpreter.buffer import BufferView
from . import llapi

def setup_hpybuffer(handles):
    class HPyBuffer(BufferView):
        _immutable_ = True

        def __init__(self, ptr, size, w_owner, itemsize, readonly, ndim,
                     format, shape, strides):
            self.rawbuf = LLBuffer(llapi.cts.cast('char*', ptr), size)
            self.w_owner = w_owner
            self.format = format
            self.itemsize = itemsize
            # cf. Objects/memoryobject.c:init_shape_strides()
            if ndim == 0:
                self.shape = []
                self.strides = []
            elif ndim == 1:
                if shape is None:
                    self.shape = [size // itemsize]
                else:
                    self.shape = shape
                if strides is None:
                    self.strides = [itemsize]
                else:
                    self.strides = strides
            else:
                assert len(shape) == ndim
                self.shape = shape
                # XXX: missing init_strides_from_shape
                self.strides = strides
            self.readonly = readonly
            self.releasebufferproc = llapi.cts.cast('HPyFunc_releasebufferproc', 0)

        def releasebuffer(self):
            if self.w_owner is None:
                # don't call twice
                return
            if self.releasebufferproc:
                with lltype.scoped_alloc(llapi.cts.gettype('HPy_buffer')) as hpybuf:
                    hpybuf.c_buf = llapi.cts.cast('void*', self.get_raw_address())
                    hpybuf.c_len = self.getlength()
                    ndim = self.getndim()
                    hpybuf.c_ndim = llapi.cts.cast('int', ndim)
                    # XXX: hpybuf.c_shape, hpybuf.c_strides, ...
                    func = llapi.cts.cast(
                        'HPyFunc_releasebufferproc', self.releasebufferproc)
                    with handles.using(self.w_owner) as h_owner:
                        func(handles.ctx, h_owner, hpybuf)
            self.w_owner = None

        def getlength(self):
            return self.rawbuf.getlength()

        def get_raw_address(self):
            return self.rawbuf.get_raw_address()

        def as_str(self):
            return self.rawbuf.as_str()

        def getbytes(self, start, size):
            return self.rawbuf.getslice(start, 1, size)

        def setbytes(self, start, s):
            assert not self.readonly
            self.rawbuf.setslice(start, s)

        def as_readbuf(self):
            return self.rawbuf

        def as_writebuf(self):
            assert not self.readonly
            return self.rawbuf

        def getndim(self):
            return len(self.shape)

        def getformat(self):
            return self.format

        def getitemsize(self):
            return self.itemsize

        def getshape(self):
            return self.shape

        def getstrides(self):
            return self.strides

    class FQ(FinalizerQueue):
        Class = HPyBuffer

        def finalizer_trigger(self):
            while 1:
                hpybuf = self.next_dead()
                if not hpybuf:
                    break
                hpybuf.releasebuffer()

    HPyBuffer.__name__ += handles.cls_suffix
    handles.HPyBuffer = HPyBuffer
    handles.BUFFER_FQ = FQ()