1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
""" WARNING! this module is incomplete and may have rough edges. Use only
if necessary
"""
import py
from struct import unpack
from rpython.rlib.rstruct.formatiterator import FormatIterator
from rpython.rlib.rstruct.error import StructError
class MasterReader(object):
def __init__(self, s):
self.input = s
self.inputpos = 0
def read(self, count):
end = self.inputpos + count
if end > len(self.input):
raise StructError("unpack str size too short for format")
s = self.input[self.inputpos : end]
self.inputpos = end
return s
def align(self, mask):
self.inputpos = (self.inputpos + mask) & ~mask
class AbstractReader(object):
pass
def reader_for_pos(pos):
class ReaderForPos(AbstractReader):
def __init__(self, mr, bigendian):
self.mr = mr
self.bigendian = bigendian
def read(self, count):
return self.mr.read(count)
def appendobj(self, value):
self.value = value
ReaderForPos.__name__ = 'ReaderForPos%d' % pos
return ReaderForPos
class FrozenUnpackIterator(FormatIterator):
def __init__(self, fmt):
self.formats = []
self.fmt = fmt
def operate(self, fmtdesc, repetitions):
if fmtdesc.needcount:
self.formats.append((fmtdesc, repetitions, None))
else:
for i in range(repetitions):
self.formats.append((fmtdesc, 1, None))
def align(self, mask):
if self.formats:
fmt, rep, _ = self.formats.pop()
self.formats.append((fmt, rep, mask))
def _create_unpacking_func(self):
rg = range(len(self.formats))
perform_lst = []
miniglobals = {}
miniglobals.update(globals())
miniglobals['bigendian'] = self.bigendian
for i in rg:
fmtdesc, rep, mask = self.formats[i]
miniglobals['unpacker%d' % i] = fmtdesc.unpack
if mask is not None:
perform_lst.append('master_reader.align(%d)' % mask)
if not fmtdesc.needcount:
perform_lst.append('unpacker%d(reader%d)' % (i, i))
else:
perform_lst.append('unpacker%d(reader%d, %d)' % (i, i, rep))
miniglobals['reader_cls%d' % i] = reader_for_pos(i)
readers = ";".join(["reader%d = reader_cls%d(master_reader, bigendian)"
% (i, i) for i in rg])
perform = ";".join(perform_lst)
unpackers = ','.join(['reader%d.value' % i for i in rg])
source = py.code.Source("""
def unpack(s):
master_reader = MasterReader(s)
%(readers)s
%(perform)s
return (%(unpackers)s)
""" % locals())
exec source.compile() in miniglobals
self.unpack = miniglobals['unpack'] # override not-rpython version
def unpack(self, s):
# NOT_RPYTHON
res = unpack(self.fmt, s)
if len(res) == 1:
return res[0]
return res
def _freeze_(self):
assert self.formats
self._create_unpacking_func()
return True
def create_unpacker(unpack_str):
fmtiter = FrozenUnpackIterator(unpack_str)
fmtiter.interpret(unpack_str)
return fmtiter
create_unpacker._annspecialcase_ = 'specialize:memo'
def runpack(fmt, input):
unpacker = create_unpacker(fmt)
return unpacker.unpack(input)
runpack._annspecialcase_ = 'specialize:arg(0)'
|