1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
|
# array._array_reconstructor is a special constructor used when
# unpickling an array. It provides a portable way to rebuild an array
# from its memory representation.
import sys
from pypy.interpreter.gateway import unwrap_spec
from pypy.interpreter.error import oefmt
from pypy.interpreter.argument import Arguments
from rpython.rlib import rutf8, rbigint
from rpython.rlib.rstruct import ieee
from rpython.rtyper.lltypesystem import rffi
from pypy.module.array import interp_array
UNKNOWN_FORMAT = -1
UNSIGNED_INT8 = 0
SIGNED_INT8 = 1
UNSIGNED_INT16_LE = 2
UNSIGNED_INT16_BE = 3
SIGNED_INT16_LE = 4
SIGNED_INT16_BE = 5
UNSIGNED_INT32_LE = 6
UNSIGNED_INT32_BE = 7
SIGNED_INT32_LE = 8
SIGNED_INT32_BE = 9
UNSIGNED_INT64_LE = 10
UNSIGNED_INT64_BE = 11
SIGNED_INT64_LE = 12
SIGNED_INT64_BE = 13
IEEE_754_FLOAT_LE = 14
IEEE_754_FLOAT_BE = 15
IEEE_754_DOUBLE_LE = 16
IEEE_754_DOUBLE_BE = 17
UTF16_LE = 18
UTF16_BE = 19
UTF32_LE = 20
UTF32_BE = 21
IS_BIG_ENDIAN = sys.byteorder == 'big'
class MachineFormat(object):
def __init__(self, bytes, signed, big_endian):
self.bytes = bytes
self.signed = signed
self.big_endian = big_endian
format_descriptors = {
UNSIGNED_INT8: MachineFormat(1, False, False),
SIGNED_INT8: MachineFormat(1, True, False),
UNSIGNED_INT16_LE: MachineFormat(2, False, False),
UNSIGNED_INT16_BE: MachineFormat(2, False, True),
SIGNED_INT16_LE: MachineFormat(2, True, False),
SIGNED_INT16_BE: MachineFormat(2, True, True),
UNSIGNED_INT32_LE: MachineFormat(4, False, False),
UNSIGNED_INT32_BE: MachineFormat(4, False, True),
SIGNED_INT32_LE: MachineFormat(4, True, False),
SIGNED_INT32_BE: MachineFormat(4, True, True),
UNSIGNED_INT64_LE: MachineFormat(8, False, False),
UNSIGNED_INT64_BE: MachineFormat(8, False, True),
SIGNED_INT64_LE: MachineFormat(8, True, False),
SIGNED_INT64_BE: MachineFormat(8, True, True),
IEEE_754_FLOAT_LE: MachineFormat(4, False, False),
IEEE_754_FLOAT_BE: MachineFormat(4, False, True),
IEEE_754_DOUBLE_LE: MachineFormat(8, False, False),
IEEE_754_DOUBLE_BE: MachineFormat(8, False, True),
UTF16_LE: MachineFormat(4, False, False),
UTF16_BE: MachineFormat(4, False, True),
UTF32_LE: MachineFormat(8, False, False),
UTF32_BE: MachineFormat(8, False, True),
}
MACHINE_FORMAT_CODE_MIN = min(format_descriptors)
MACHINE_FORMAT_CODE_MAX = max(format_descriptors)
@unwrap_spec(typecode='text', mformat_code=int)
def array_reconstructor(space, w_cls, typecode, mformat_code, w_items):
# Fast path: machine format code corresponds to the
# platform-independent typecode.
if mformat_code == typecode_to_mformat_code(typecode):
return interp_array.w_array(
space, w_cls, typecode, Arguments(space, [w_items]))
if typecode not in interp_array.types:
raise oefmt(space.w_ValueError, "invalid type code")
if (mformat_code < MACHINE_FORMAT_CODE_MIN or
mformat_code > MACHINE_FORMAT_CODE_MAX):
raise oefmt(space.w_ValueError, "invalid machine format code")
# Slow path: Decode the byte string according to the given machine
# format code. This occurs when the computer unpickling the array
# object is architecturally different from the one that pickled
# the array.
if (mformat_code == IEEE_754_FLOAT_LE or
mformat_code == IEEE_754_FLOAT_BE or
mformat_code == IEEE_754_DOUBLE_LE or
mformat_code == IEEE_754_DOUBLE_BE):
descr = format_descriptors[mformat_code]
memstr = space.bytes_w(w_items)
step = descr.bytes
converted_items = [
space.newfloat(ieee.unpack_float(
memstr[i:i+step],
descr.big_endian))
for i in range(0, len(memstr), step)]
w_converted_items = space.newlist(converted_items)
elif mformat_code == UTF16_LE:
w_converted_items = space.call_method(
w_items, "decode", space.newtext("utf-16-le"))
elif mformat_code == UTF16_BE:
w_converted_items = space.call_method(
w_items, "decode", space.newtext("utf-16-be"))
elif mformat_code == UTF32_LE:
w_converted_items = space.call_method(
w_items, "decode", space.newtext("utf-32-le"))
elif mformat_code == UTF32_BE:
w_converted_items = space.call_method(
w_items, "decode", space.newtext("utf-32-be"))
else:
descr = format_descriptors[mformat_code]
# If possible, try to pack array's items using a data type
# that fits better. This may result in an array with narrower
# or wider elements.
#
# For example, if a 32-bit machine pickles a L-code array of
# unsigned longs, then the array will be unpickled by 64-bit
# machine as an I-code array of unsigned ints.
#
# XXX: Is it possible to write a unit test for this?
for tc in interp_array.unroll_typecodes:
typecode_descr = interp_array.types[tc]
if (typecode_descr.is_integer_type() and
typecode_descr.bytes == descr.bytes and
typecode_descr.signed == descr.signed):
typecode = tc
break
memstr = space.bytes_w(w_items)
step = descr.bytes
converted_items = [
space.newlong_from_rbigint(rbigint.rbigint.frombytes(
memstr[i:i+step],
descr.big_endian and 'big' or 'little',
descr.signed))
for i in range(0, len(memstr), step)]
w_converted_items = space.newlist(converted_items)
return interp_array.w_array(
space, w_cls, typecode, Arguments(space, [w_converted_items]))
def typecode_to_mformat_code(typecode):
intsize = 0
if typecode == 'b':
return SIGNED_INT8
elif typecode == 'B':
return UNSIGNED_INT8
elif typecode == 'u':
if rutf8.MAXUNICODE == 0xffff:
return UTF16_LE + IS_BIG_ENDIAN
else:
return UTF32_LE + IS_BIG_ENDIAN
elif typecode == 'f':
return IEEE_754_FLOAT_LE + IS_BIG_ENDIAN
elif typecode == 'd':
return IEEE_754_DOUBLE_LE + IS_BIG_ENDIAN
# Integers
elif typecode == 'h':
intsize = rffi.sizeof(rffi.SHORT)
is_signed = True
elif typecode == 'H':
intsize = rffi.sizeof(rffi.SHORT)
is_signed = False
elif typecode == 'i':
intsize = rffi.sizeof(rffi.INT)
is_signed = True
elif typecode == 'I':
intsize = rffi.sizeof(rffi.INT)
is_signed = False
elif typecode == 'l':
intsize = rffi.sizeof(rffi.LONG)
is_signed = True
elif typecode == 'L':
intsize = rffi.sizeof(rffi.LONG)
is_signed = False
elif typecode == 'q':
intsize = rffi.sizeof(rffi.LONGLONG)
is_signed = True
elif typecode == 'Q':
intsize = rffi.sizeof(rffi.LONGLONG)
is_signed = False
else:
return UNKNOWN_FORMAT
if intsize == 2:
return UNSIGNED_INT16_LE + IS_BIG_ENDIAN + (2 * is_signed)
elif intsize == 4:
return UNSIGNED_INT32_LE + IS_BIG_ENDIAN + (2 * is_signed)
elif intsize == 8:
return UNSIGNED_INT64_LE + IS_BIG_ENDIAN + (2 * is_signed)
return UNKNOWN_FORMAT
|