File: marshaller.py

package info (click to toggle)
python-dbus-next 0.2.3-6
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 704 kB
  • sloc: python: 6,018; makefile: 45; xml: 29
file content (160 lines) | stat: -rw-r--r-- 5,051 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from ..signature import SignatureTree
from struct import pack


class Marshaller:
    def __init__(self, signature, body):
        self.signature_tree = SignatureTree._get(signature)
        self.signature_tree.verify(body)
        self.buffer = bytearray()
        self.body = body

        self.writers = {
            'y': self.write_byte,
            'b': self.write_boolean,
            'n': self.write_int16,
            'q': self.write_uint16,
            'i': self.write_int32,
            'u': self.write_uint32,
            'x': self.write_int64,
            't': self.write_uint64,
            'd': self.write_double,
            'h': self.write_uint32,
            'o': self.write_string,
            's': self.write_string,
            'g': self.write_signature,
            'a': self.write_array,
            '(': self.write_struct,
            '{': self.write_dict_entry,
            'v': self.write_variant
        }

    def align(self, n):
        offset = n - len(self.buffer) % n
        if offset == 0 or offset == n:
            return 0
        self.buffer.extend(bytes(offset))
        return offset

    def write_byte(self, byte, _=None):
        self.buffer.append(byte)
        return 1

    def write_boolean(self, boolean, _=None):
        if boolean:
            return self.write_uint32(1)
        else:
            return self.write_uint32(0)

    def write_int16(self, int16, _=None):
        written = self.align(2)
        self.buffer.extend(pack('<h', int16))
        return written + 2

    def write_uint16(self, uint16, _=None):
        written = self.align(2)
        self.buffer.extend(pack('<H', uint16))
        return written + 2

    def write_int32(self, int32, _):
        written = self.align(4)
        self.buffer.extend(pack('<i', int32))
        return written + 4

    def write_uint32(self, uint32, _=None):
        written = self.align(4)
        self.buffer.extend(pack('<I', uint32))
        return written + 4

    def write_int64(self, int64, _=None):
        written = self.align(8)
        self.buffer.extend(pack('<q', int64))
        return written + 8

    def write_uint64(self, uint64, _=None):
        written = self.align(8)
        self.buffer.extend(pack('<Q', uint64))
        return written + 8

    def write_double(self, double, _=None):
        written = self.align(8)
        self.buffer.extend(pack('<d', double))
        return written + 8

    def write_signature(self, signature, _=None):
        signature = signature.encode()
        signature_len = len(signature)
        self.buffer.append(signature_len)
        self.buffer.extend(signature)
        self.buffer.append(0)
        return signature_len + 2

    def write_string(self, value, _=None):
        value = value.encode()
        value_len = len(value)
        written = self.write_uint32(value_len)
        self.buffer.extend(value)
        written += value_len
        self.buffer.append(0)
        written += 1
        return written

    def write_variant(self, variant, _=None):
        written = self.write_signature(variant.signature)
        written += self.write_single(variant.type, variant.value)
        return written

    def write_array(self, array, type_):
        # TODO max array size is 64MiB (67108864 bytes)
        written = self.align(4)
        # length placeholder
        offset = len(self.buffer)
        written += self.write_uint32(0)
        child_type = type_.children[0]

        if child_type.token in 'xtd{(':
            # the first alignment is not included in array size
            written += self.align(8)

        array_len = 0
        if child_type.token == '{':
            for key, value in array.items():
                array_len += self.write_dict_entry([key, value], child_type)
        elif child_type.token == 'y':
            array_len = len(array)
            self.buffer.extend(array)
        else:
            for value in array:
                array_len += self.write_single(child_type, value)

        array_len_packed = pack('<I', array_len)
        for i in range(offset, offset + 4):
            self.buffer[i] = array_len_packed[i - offset]

        return written + array_len

    def write_struct(self, array, type_):
        written = self.align(8)
        for i, value in enumerate(array):
            written += self.write_single(type_.children[i], value)
        return written

    def write_dict_entry(self, dict_entry, type_):
        written = self.align(8)
        written += self.write_single(type_.children[0], dict_entry[0])
        written += self.write_single(type_.children[1], dict_entry[1])
        return written

    def write_single(self, type_, body):
        t = type_.token

        if t not in self.writers:
            raise NotImplementedError(f'type isnt implemented yet: "{t}"')

        return self.writers[t](body, type_)

    def marshall(self):
        self.buffer.clear()
        for i, type_ in enumerate(self.signature_tree.types):
            self.write_single(type_, self.body[i])
        return self.buffer