File: interp_struct.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (270 lines) | stat: -rw-r--r-- 9,971 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
from rpython.rlib import jit
from rpython.rlib.buffer import SubBuffer
from rpython.rlib.mutbuffer import MutableStringBuffer
from rpython.rlib.rarithmetic import r_uint, widen
from rpython.rlib.rstruct.error import StructError, StructOverflowError
from rpython.rlib.rstruct.formatiterator import CalcSizeFormatIterator

from pypy.interpreter.baseobjspace import W_Root
from pypy.interpreter.gateway import interp2app, unwrap_spec
from pypy.interpreter.error import OperationError, oefmt
from pypy.interpreter.typedef import TypeDef, interp_attrproperty
from pypy.interpreter.typedef import make_weakref_descr
from pypy.module.struct.formatiterator import (
    PackFormatIterator, UnpackFormatIterator
)


class Cache:
    def __init__(self, space):
        self.error = space.new_exception_class("struct.error", space.w_Exception)


def get_error(space):
    return space.fromcache(Cache).error


def _calcsize(space, format):
    fmtiter = CalcSizeFormatIterator()
    try:
        fmtiter.interpret(format)
    except StructOverflowError as e:
        raise OperationError(space.w_OverflowError, space.newtext(e.msg))
    except StructError as e:
        raise OperationError(get_error(space), space.newtext(e.msg))
    return fmtiter.totalsize


def text_or_bytes_w(space, w_input):
    # why does CPython do this??
    if space.isinstance_w(w_input, space.w_bytes):
        return space.bytes_w(w_input)
    else:
        return space.text_w(w_input)

def calcsize(space, w_format):
    """Return size of C struct described by format string fmt."""
    format = text_or_bytes_w(space, w_format)
    return space.newint(_calcsize(space, format))


def _pack(space, format, args_w):
    """Return string containing values v1, v2, ... packed according to fmt."""
    size = _calcsize(space, format)
    wbuf = MutableStringBuffer(size)
    fmtiter = PackFormatIterator(space, wbuf, args_w)
    try:
        fmtiter.interpret(format)
    except StructOverflowError as e:
        raise OperationError(space.w_OverflowError, space.newtext(e.msg))
    except StructError as e:
        raise OperationError(get_error(space), space.newtext(e.msg))
    assert fmtiter.pos == wbuf.getlength(), 'missing .advance() or wrong calcsize()'
    return wbuf.finish()


def pack(space, w_format, args_w):
    """Return string containing values v1, v2, ... packed according to fmt."""
    format = text_or_bytes_w(space, w_format)
    return do_pack(space, format, args_w)

def do_pack(space, format, args_w):
    return space.newbytes(_pack(space, format, args_w))


@unwrap_spec(offset=int)
def pack_into(space, w_format, w_buffer, offset, args_w):
    """ Pack the values v1, v2, ... according to fmt.
Write the packed bytes into the writable buffer buf starting at offset
    """
    format = text_or_bytes_w(space, w_format)
    return do_pack_into(space, format, w_buffer, offset, args_w)

def do_pack_into(space, format, w_buffer, offset, args_w):
    """ Pack the values v1, v2, ... according to fmt.
Write the packed bytes into the writable buffer buf starting at offset
    """
    size = _calcsize(space, format)
    buf = space.writebuf_w(w_buffer)
    buflen = buf.getlength()
    if offset < 0:
        # Check that negative offset is low enough to fit data
        if offset + size > 0:
            raise oefmt(get_error(space),
                        "no space to pack %d bytes at offset %d",
                        size,
                        offset)
        # Check that negative offset is not crossing buffer boundary
        if offset + buflen < 0:
            raise oefmt(get_error(space),
                        "offset %d out of range for %d-byte buffer",
                        offset,
                        buflen)
        offset += buflen
    if (buflen - offset) < size:
        raise oefmt(get_error(space),
                    "pack_into requires a buffer of at least %d bytes for "
                    "packing %d bytes at offset %d "
                    "(actual buffer size is %d)",
                    r_uint(size + offset),
                    size,
                    offset,
                    buflen)
    #
    wbuf = SubBuffer(buf, offset, size)
    fmtiter = PackFormatIterator(space, wbuf, args_w)
    try:
        fmtiter.interpret(format)
    except StructOverflowError as e:
        raise OperationError(space.w_OverflowError, space.newtext(e.msg))
    except StructError as e:
        raise OperationError(get_error(space), space.newtext(e.msg))


def _unpack(space, format, buf):
    fmtiter = UnpackFormatIterator(space, buf)
    try:
        fmtiter.interpret(format)
    except StructOverflowError as e:
        raise OperationError(space.w_OverflowError, space.newtext(e.msg))
    except StructError as e:
        raise OperationError(get_error(space), space.newtext(e.msg))
    return space.newtuple(fmtiter.result_w[:])


def unpack(space, w_format, w_str):
    format = text_or_bytes_w(space, w_format)
    return do_unpack(space, format, w_str)

def do_unpack(space, format, w_str):
    buf = space.readbuf_w(w_str)
    return _unpack(space, format, buf)


@unwrap_spec(offset=int)
def unpack_from(space, w_format, w_buffer, offset=0):
    """Unpack the buffer, containing packed C structure data, according to
fmt, starting at offset. Requires len(buffer[offset:]) >= calcsize(fmt)."""
    format = text_or_bytes_w(space, w_format)
    return do_unpack_from(space, format, w_buffer, offset)

def do_unpack_from(space, format, w_buffer, offset=0):
    """Unpack the buffer, containing packed C structure data, according to
fmt, starting at offset. Requires len(buffer[offset:]) >= calcsize(fmt)."""
    s_size = _calcsize(space, format)
    buf = space.readbuf_w(w_buffer)
    buf_length = buf.getlength()
    if offset < 0:
        if offset + s_size > 0:
            raise oefmt(get_error(space),
                    "not enough data to unpack %d bytes at offset %d",
                    s_size, offset)
        if offset + buf_length < 0:
            raise oefmt(get_error(space),
                    "offset %d out of range for %d-byte buffer",
                    offset, buf_length)
        offset += buf_length
    if buf_length - offset < s_size:
        raise oefmt(get_error(space),
                    "unpack_from requires a buffer of at least %d bytes for "
                    "unpacking %d bytes at offset %d "
                    "(actual buffer size is %d)",
                    r_uint(s_size + offset), s_size, offset, buf_length)
    buf = SubBuffer(buf, offset, s_size)
    return _unpack(space, format, buf)


class W_UnpackIter(W_Root):
    def __init__(self, space, w_struct, w_buffer):
        buf = space.readbuf_w(w_buffer)
        if w_struct.size <= 0:
            raise oefmt(get_error(space),
                "cannot iteratively unpack with a struct of length %d",
                w_struct.size)
        if buf.getlength() % w_struct.size != 0:
            raise oefmt(get_error(space),
                "iterative unpacking requires a bytes length multiple of %d",
                w_struct.size)
        self.w_struct = w_struct
        self.buf = buf
        self.index = 0

    def descr_iter(self, space):
        return self

    def descr_next(self, space):
        if self.w_struct is None:
            raise OperationError(space.w_StopIteration, space.w_None)
        if self.index >= self.buf.getlength():
            raise OperationError(space.w_StopIteration, space.w_None)
        size = self.w_struct.size
        buf = SubBuffer(self.buf, self.index, size)
        w_res = _unpack(space, self.w_struct.format, buf)
        self.index += size
        return w_res

    def descr_length_hint(self, space):
        if self.w_struct is None:
            return space.newint(0)
        length = (self.buf.getlength() - self.index) // self.w_struct.size
        return space.newint(length)


class W_Struct(W_Root):
    _immutable_fields_ = ["format", "size"]

    format = ""
    size = -1

    def descr__new__(space, w_subtype, __args__):
        return space.allocate_instance(W_Struct, w_subtype)

    def descr__init__(self, space, w_format):
        format = text_or_bytes_w(space, w_format)
        self.format = format
        self.size = _calcsize(space, format)

    def descr_pack(self, space, args_w):
        return do_pack(space, jit.promote_string(self.format), args_w)

    @unwrap_spec(offset=int)
    def descr_pack_into(self, space, w_buffer, offset, args_w):
        return do_pack_into(space, jit.promote_string(self.format), w_buffer, offset, args_w)

    def descr_unpack(self, space, w_str):
        return do_unpack(space, jit.promote_string(self.format), w_str)

    @unwrap_spec(offset=int)
    def descr_unpack_from(self, space, w_buffer, offset=0):
        return do_unpack_from(space, jit.promote_string(self.format), w_buffer, offset)

    def descr_iter_unpack(self, space, w_buffer):
        return W_UnpackIter(space, self, w_buffer)

W_Struct.typedef = TypeDef("Struct",
    __new__=interp2app(W_Struct.descr__new__.im_func),
    __init__=interp2app(W_Struct.descr__init__),
    format=interp_attrproperty("format", cls=W_Struct, wrapfn="newtext"),
    size=interp_attrproperty("size", cls=W_Struct, wrapfn="newint"),

    pack=interp2app(W_Struct.descr_pack),
    unpack=interp2app(W_Struct.descr_unpack),
    pack_into=interp2app(W_Struct.descr_pack_into),
    unpack_from=interp2app(W_Struct.descr_unpack_from),
    iter_unpack=interp2app(W_Struct.descr_iter_unpack),
    __weakref__=make_weakref_descr(W_Struct),
)

W_UnpackIter.typedef = TypeDef("unpack_iterator",
    __iter__=interp2app(W_UnpackIter.descr_iter),
    __next__=interp2app(W_UnpackIter.descr_next),
    __length_hint__=interp2app(W_UnpackIter.descr_length_hint)
)

def iter_unpack(space, w_format, w_buffer):
    w_struct = W_Struct()
    w_struct.descr__init__(space, w_format)
    return W_UnpackIter(space, w_struct, w_buffer)

def clearcache(space):
    """No-op on PyPy"""