File: runpack.py

package info (click to toggle)
pypy 5.6.0%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 97,040 kB
  • ctags: 185,069
  • sloc: python: 1,147,862; ansic: 49,642; cpp: 5,245; asm: 5,169; makefile: 529; sh: 481; xml: 232; lisp: 45
file content (113 lines) | stat: -rw-r--r-- 3,562 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113

""" WARNING! this module is incomplete and may have rough edges. Use only
if necessary
"""

import py
from struct import unpack
from rpython.rlib.rstruct.formatiterator import FormatIterator
from rpython.rlib.rstruct.error import StructError
from rpython.rlib.objectmodel import specialize

class MasterReader(object):
    def __init__(self, s):
        self.input = s
        self.inputpos = 0

    def read(self, count):
        end = self.inputpos + count
        if end > len(self.input):
            raise StructError("unpack str size too short for format")
        s = self.input[self.inputpos : end]
        self.inputpos = end
        return s

    def align(self, mask):
        self.inputpos = (self.inputpos + mask) & ~mask

class AbstractReader(object):
    pass

def reader_for_pos(pos):
    class ReaderForPos(AbstractReader):
        def __init__(self, mr, bigendian):
            self.mr = mr
            self.bigendian = bigendian

        def read(self, count):
            return self.mr.read(count)

        def appendobj(self, value):
            self.value = value

        def get_buffer_as_string_maybe(self):
            return self.mr.input, self.mr.inputpos

        def skip(self, size):
            self.read(size) # XXX, could avoid taking the slice
    ReaderForPos.__name__ = 'ReaderForPos%d' % pos
    return ReaderForPos

class FrozenUnpackIterator(FormatIterator):
    def __init__(self, fmt):
        self.formats = []
        self.fmt = fmt

    def operate(self, fmtdesc, repetitions):
        if fmtdesc.needcount:
            self.formats.append((fmtdesc, repetitions, None))
        else:
            for i in range(repetitions):
                self.formats.append((fmtdesc, 1, None))

    def align(self, mask):
        if self.formats:
            fmt, rep, _ = self.formats.pop()
            self.formats.append((fmt, rep, mask))

    def _create_unpacking_func(self):
        rg = range(len(self.formats))
        perform_lst = []
        miniglobals = {}
        miniglobals.update(globals())
        miniglobals['bigendian'] = self.bigendian
        for i in rg:
            fmtdesc, rep, mask = self.formats[i]
            miniglobals['unpacker%d' % i] = fmtdesc.unpack
            if mask is not None:
                perform_lst.append('master_reader.align(%d)' % mask)
            if not fmtdesc.needcount:
                perform_lst.append('unpacker%d(reader%d)' % (i, i))
            else:
                perform_lst.append('unpacker%d(reader%d, %d)' % (i, i, rep))
            miniglobals['reader_cls%d' % i] = reader_for_pos(i)
        readers = ";".join(["reader%d = reader_cls%d(master_reader, bigendian)"
                            % (i, i) for i in rg])
        perform = ";".join(perform_lst)
        unpackers = ','.join(['reader%d.value' % i for i in rg])
        source = py.code.Source("""
        def unpack(s):
            master_reader = MasterReader(s)
            %(readers)s
            %(perform)s
            return (%(unpackers)s)
        """ % locals())
        exec source.compile() in miniglobals
        self.unpack = miniglobals['unpack'] # override not-rpython version

    def _freeze_(self):
        assert self.formats
        self._create_unpacking_func()
        return True

@specialize.memo()
def create_unpacker(unpack_str):
    fmtiter = FrozenUnpackIterator(unpack_str)
    fmtiter.interpret(unpack_str)
    assert fmtiter._freeze_()
    return fmtiter

@specialize.arg(0)
def runpack(fmt, input):
    unpacker = create_unpacker(fmt)
    return unpacker.unpack(input)