File: runpack.py

package info (click to toggle)
pypy 7.0.0%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 107,216 kB
  • sloc: python: 1,201,787; ansic: 62,419; asm: 5,169; cpp: 3,017; sh: 2,534; makefile: 545; xml: 243; lisp: 45; awk: 4
file content (125 lines) | stat: -rw-r--r-- 3,932 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125

""" WARNING! this module is incomplete and may have rough edges. Use only
if necessary
"""

import py
from struct import unpack
from rpython.rlib.rstruct.formatiterator import FormatIterator
from rpython.rlib.rstruct.error import StructError
from rpython.rlib.objectmodel import specialize
from rpython.rlib.buffer import StringBuffer

class MasterReader(object):
    def __init__(self, s):
        self.inputbuf = StringBuffer(s)
        self.length = len(s)
        self.inputpos = 0

    def can_advance(self, count):
        end = self.inputpos + count
        return end <= self.length

    def advance(self, count):
        if not self.can_advance(count):
            raise StructError("unpack str size too short for format")
        self.inputpos += count

    def read(self, count):
        curpos = self.inputpos
        end = curpos + count
        self.advance(count) # raise if we are out of bound
        return self.inputbuf.getslice(curpos, end, 1, count)

    def align(self, mask):
        self.inputpos = (self.inputpos + mask) & ~mask

class AbstractReader(object):
    pass

def reader_for_pos(pos):
    class ReaderForPos(AbstractReader):
        def __init__(self, mr, bigendian):
            self.mr = mr
            self.bigendian = bigendian

        def read(self, count):
            return self.mr.read(count)

        def appendobj(self, value):
            self.value = value

        def get_buffer_and_pos(self):
            return self.mr.inputbuf, self.mr.inputpos

        def can_advance(self, size):
            return self.mr.can_advance(size)

        def advance(self, size):
            self.mr.advance(size)
    ReaderForPos.__name__ = 'ReaderForPos%d' % pos
    return ReaderForPos

class FrozenUnpackIterator(FormatIterator):
    def __init__(self, fmt):
        self.formats = []
        self.fmt = fmt

    def operate(self, fmtdesc, repetitions):
        if fmtdesc.needcount:
            self.formats.append((fmtdesc, repetitions, None))
        else:
            for i in range(repetitions):
                self.formats.append((fmtdesc, 1, None))

    def align(self, mask):
        if self.formats:
            fmt, rep, _ = self.formats.pop()
            self.formats.append((fmt, rep, mask))

    def _create_unpacking_func(self):
        rg = range(len(self.formats))
        perform_lst = []
        miniglobals = {}
        miniglobals.update(globals())
        miniglobals['bigendian'] = self.bigendian
        for i in rg:
            fmtdesc, rep, mask = self.formats[i]
            miniglobals['unpacker%d' % i] = fmtdesc.unpack
            if mask is not None:
                perform_lst.append('master_reader.align(%d)' % mask)
            if not fmtdesc.needcount:
                perform_lst.append('unpacker%d(reader%d)' % (i, i))
            else:
                perform_lst.append('unpacker%d(reader%d, %d)' % (i, i, rep))
            miniglobals['reader_cls%d' % i] = reader_for_pos(i)
        readers = ";".join(["reader%d = reader_cls%d(master_reader, bigendian)"
                            % (i, i) for i in rg])
        perform = ";".join(perform_lst)
        unpackers = ','.join(['reader%d.value' % i for i in rg])
        source = py.code.Source("""
        def unpack(s):
            master_reader = MasterReader(s)
            %(readers)s
            %(perform)s
            return (%(unpackers)s)
        """ % locals())
        exec source.compile() in miniglobals
        self.unpack = miniglobals['unpack'] # override not-rpython version

    def _freeze_(self):
        assert self.formats
        self._create_unpacking_func()
        return True

@specialize.memo()
def create_unpacker(unpack_str):
    fmtiter = FrozenUnpackIterator(unpack_str)
    fmtiter.interpret(unpack_str)
    assert fmtiter._freeze_()
    return fmtiter

@specialize.arg(0)
def runpack(fmt, input):
    unpacker = create_unpacker(fmt)
    return unpacker.unpack(input)