1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
|
from rpython.rlib import jit
from rpython.rlib.buffer import SubBuffer
from rpython.rlib.mutbuffer import MutableStringBuffer
from rpython.rlib.rarithmetic import r_uint, widen
from rpython.rlib.rstruct.error import StructError, StructOverflowError
from rpython.rlib.rstruct.formatiterator import CalcSizeFormatIterator
from pypy.interpreter.baseobjspace import W_Root
from pypy.interpreter.gateway import interp2app, unwrap_spec
from pypy.interpreter.error import OperationError, oefmt
from pypy.interpreter.typedef import TypeDef, interp_attrproperty
from pypy.interpreter.typedef import make_weakref_descr
from pypy.module.struct.formatiterator import (
PackFormatIterator, UnpackFormatIterator
)
class Cache:
def __init__(self, space):
self.error = space.new_exception_class("struct.error", space.w_Exception)
def get_error(space):
return space.fromcache(Cache).error
def _calcsize(space, format):
fmtiter = CalcSizeFormatIterator()
try:
fmtiter.interpret(format)
except StructOverflowError as e:
raise OperationError(space.w_OverflowError, space.newtext(e.msg))
except StructError as e:
raise OperationError(get_error(space), space.newtext(e.msg))
return fmtiter.totalsize
def text_or_bytes_w(space, w_input):
# why does CPython do this??
if space.isinstance_w(w_input, space.w_bytes):
return space.bytes_w(w_input)
else:
return space.text_w(w_input)
def calcsize(space, w_format):
"""Return size of C struct described by format string fmt."""
format = text_or_bytes_w(space, w_format)
return space.newint(_calcsize(space, format))
def _pack(space, format, args_w):
"""Return string containing values v1, v2, ... packed according to fmt."""
size = _calcsize(space, format)
wbuf = MutableStringBuffer(size)
fmtiter = PackFormatIterator(space, wbuf, args_w)
try:
fmtiter.interpret(format)
except StructOverflowError as e:
raise OperationError(space.w_OverflowError, space.newtext(e.msg))
except StructError as e:
raise OperationError(get_error(space), space.newtext(e.msg))
assert fmtiter.pos == wbuf.getlength(), 'missing .advance() or wrong calcsize()'
return wbuf.finish()
def pack(space, w_format, args_w):
"""Return string containing values v1, v2, ... packed according to fmt."""
format = text_or_bytes_w(space, w_format)
return do_pack(space, format, args_w)
def do_pack(space, format, args_w):
return space.newbytes(_pack(space, format, args_w))
@unwrap_spec(offset=int)
def pack_into(space, w_format, w_buffer, offset, args_w):
""" Pack the values v1, v2, ... according to fmt.
Write the packed bytes into the writable buffer buf starting at offset
"""
format = text_or_bytes_w(space, w_format)
return do_pack_into(space, format, w_buffer, offset, args_w)
def do_pack_into(space, format, w_buffer, offset, args_w):
""" Pack the values v1, v2, ... according to fmt.
Write the packed bytes into the writable buffer buf starting at offset
"""
size = _calcsize(space, format)
buf = space.writebuf_w(w_buffer)
buflen = buf.getlength()
if offset < 0:
# Check that negative offset is low enough to fit data
if offset + size > 0:
raise oefmt(get_error(space),
"no space to pack %d bytes at offset %d",
size,
offset)
# Check that negative offset is not crossing buffer boundary
if offset + buflen < 0:
raise oefmt(get_error(space),
"offset %d out of range for %d-byte buffer",
offset,
buflen)
offset += buflen
if (buflen - offset) < size:
raise oefmt(get_error(space),
"pack_into requires a buffer of at least %d bytes for "
"packing %d bytes at offset %d "
"(actual buffer size is %d)",
r_uint(size + offset),
size,
offset,
buflen)
#
wbuf = SubBuffer(buf, offset, size)
fmtiter = PackFormatIterator(space, wbuf, args_w)
try:
fmtiter.interpret(format)
except StructOverflowError as e:
raise OperationError(space.w_OverflowError, space.newtext(e.msg))
except StructError as e:
raise OperationError(get_error(space), space.newtext(e.msg))
def _unpack(space, format, buf):
fmtiter = UnpackFormatIterator(space, buf)
try:
fmtiter.interpret(format)
except StructOverflowError as e:
raise OperationError(space.w_OverflowError, space.newtext(e.msg))
except StructError as e:
raise OperationError(get_error(space), space.newtext(e.msg))
return space.newtuple(fmtiter.result_w[:])
def unpack(space, w_format, w_str):
format = text_or_bytes_w(space, w_format)
return do_unpack(space, format, w_str)
def do_unpack(space, format, w_str):
buf = space.readbuf_w(w_str)
return _unpack(space, format, buf)
@unwrap_spec(offset=int)
def unpack_from(space, w_format, w_buffer, offset=0):
"""Unpack the buffer, containing packed C structure data, according to
fmt, starting at offset. Requires len(buffer[offset:]) >= calcsize(fmt)."""
format = text_or_bytes_w(space, w_format)
return do_unpack_from(space, format, w_buffer, offset)
def do_unpack_from(space, format, w_buffer, offset=0):
"""Unpack the buffer, containing packed C structure data, according to
fmt, starting at offset. Requires len(buffer[offset:]) >= calcsize(fmt)."""
s_size = _calcsize(space, format)
buf = space.readbuf_w(w_buffer)
buf_length = buf.getlength()
if offset < 0:
if offset + s_size > 0:
raise oefmt(get_error(space),
"not enough data to unpack %d bytes at offset %d",
s_size, offset)
if offset + buf_length < 0:
raise oefmt(get_error(space),
"offset %d out of range for %d-byte buffer",
offset, buf_length)
offset += buf_length
if buf_length - offset < s_size:
raise oefmt(get_error(space),
"unpack_from requires a buffer of at least %d bytes for "
"unpacking %d bytes at offset %d "
"(actual buffer size is %d)",
r_uint(s_size + offset), s_size, offset, buf_length)
buf = SubBuffer(buf, offset, s_size)
return _unpack(space, format, buf)
class W_UnpackIter(W_Root):
def __init__(self, space, w_struct, w_buffer):
buf = space.readbuf_w(w_buffer)
if w_struct.size <= 0:
raise oefmt(get_error(space),
"cannot iteratively unpack with a struct of length %d",
w_struct.size)
if buf.getlength() % w_struct.size != 0:
raise oefmt(get_error(space),
"iterative unpacking requires a bytes length multiple of %d",
w_struct.size)
self.w_struct = w_struct
self.buf = buf
self.index = 0
def descr_iter(self, space):
return self
def descr_next(self, space):
if self.w_struct is None:
raise OperationError(space.w_StopIteration, space.w_None)
if self.index >= self.buf.getlength():
raise OperationError(space.w_StopIteration, space.w_None)
size = self.w_struct.size
buf = SubBuffer(self.buf, self.index, size)
w_res = _unpack(space, self.w_struct.format, buf)
self.index += size
return w_res
def descr_length_hint(self, space):
if self.w_struct is None:
return space.newint(0)
length = (self.buf.getlength() - self.index) // self.w_struct.size
return space.newint(length)
class W_Struct(W_Root):
_immutable_fields_ = ["format", "size"]
format = ""
size = -1
def descr__new__(space, w_subtype, __args__):
return space.allocate_instance(W_Struct, w_subtype)
def descr__init__(self, space, w_format):
format = text_or_bytes_w(space, w_format)
self.format = format
self.size = _calcsize(space, format)
def descr_pack(self, space, args_w):
return do_pack(space, jit.promote_string(self.format), args_w)
@unwrap_spec(offset=int)
def descr_pack_into(self, space, w_buffer, offset, args_w):
return do_pack_into(space, jit.promote_string(self.format), w_buffer, offset, args_w)
def descr_unpack(self, space, w_str):
return do_unpack(space, jit.promote_string(self.format), w_str)
@unwrap_spec(offset=int)
def descr_unpack_from(self, space, w_buffer, offset=0):
return do_unpack_from(space, jit.promote_string(self.format), w_buffer, offset)
def descr_iter_unpack(self, space, w_buffer):
return W_UnpackIter(space, self, w_buffer)
W_Struct.typedef = TypeDef("Struct",
__new__=interp2app(W_Struct.descr__new__.im_func),
__init__=interp2app(W_Struct.descr__init__),
format=interp_attrproperty("format", cls=W_Struct, wrapfn="newtext"),
size=interp_attrproperty("size", cls=W_Struct, wrapfn="newint"),
pack=interp2app(W_Struct.descr_pack),
unpack=interp2app(W_Struct.descr_unpack),
pack_into=interp2app(W_Struct.descr_pack_into),
unpack_from=interp2app(W_Struct.descr_unpack_from),
iter_unpack=interp2app(W_Struct.descr_iter_unpack),
__weakref__=make_weakref_descr(W_Struct),
)
W_UnpackIter.typedef = TypeDef("unpack_iterator",
__iter__=interp2app(W_UnpackIter.descr_iter),
__next__=interp2app(W_UnpackIter.descr_next),
__length_hint__=interp2app(W_UnpackIter.descr_length_hint)
)
def iter_unpack(space, w_format, w_buffer):
w_struct = W_Struct()
w_struct.descr__init__(space, w_format)
return W_UnpackIter(space, w_struct, w_buffer)
def clearcache(space):
"""No-op on PyPy"""
|