File: binaryreader.py

package info (click to toggle)
python-telethon 1.42.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,520 kB
  • sloc: python: 16,285; javascript: 200; makefile: 16; sh: 11
file content (201 lines) | stat: -rw-r--r-- 6,270 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""
This module contains the BinaryReader utility class.
"""
import struct
import time
from datetime import datetime, timedelta, timezone

from ..errors import TypeNotFoundError
from ..tl.alltlobjects import tlobjects
from ..tl.core import core_objects

_EPOCH_NAIVE = datetime(*time.gmtime(0)[:6])
_EPOCH = _EPOCH_NAIVE.replace(tzinfo=timezone.utc)


class BinaryReader:
    """
    Small utility class to read binary data.
    """

    def __init__(self, data):
        self.stream = data or b''
        self.position = 0
        self._last = None  # Should come in handy to spot -404 errors

    # region Reading

    # "All numbers are written as little endian."
    # https://core.telegram.org/mtproto
    def read_byte(self):
        """Reads a single byte value."""
        value, = struct.unpack_from("<B", self.stream, self.position)
        self.position += 1
        return value

    def read_int(self, signed=True):
        """Reads an integer (4 bytes) value."""
        fmt = '<i' if signed else '<I'
        value, = struct.unpack_from(fmt, self.stream, self.position)
        self.position += 4
        return value

    def read_long(self, signed=True):
        """Reads a long integer (8 bytes) value."""
        fmt = '<q' if signed else '<Q'
        value, = struct.unpack_from(fmt, self.stream, self.position)
        self.position += 8
        return value

    def read_float(self):
        """Reads a real floating point (4 bytes) value."""
        value, = struct.unpack_from("<f", self.stream, self.position)
        self.position += 4
        return value

    def read_double(self):
        """Reads a real floating point (8 bytes) value."""
        value, = struct.unpack_from("<d", self.stream, self.position)
        self.position += 8
        return value

    def read_large_int(self, bits, signed=True):
        """Reads a n-bits long integer value."""
        return int.from_bytes(
            self.read(bits // 8), byteorder='little', signed=signed)

    def read(self, length=-1):
        """Read the given amount of bytes, or -1 to read all remaining."""
        if length >= 0:
            result = self.stream[self.position:self.position + length]
            self.position += length
        else:
            result = self.stream[self.position:]
            self.position += len(result)
        if (length >= 0) and (len(result) != length):
            raise BufferError(
                'No more data left to read (need {}, got {}: {}); last read {}'
                .format(length, len(result), repr(result), repr(self._last))
            )

        self._last = result
        return result

    def get_bytes(self):
        """Gets the byte array representing the current buffer as a whole."""
        return self.stream

    # endregion

    # region Telegram custom reading

    def tgread_bytes(self):
        """
        Reads a Telegram-encoded byte array, without the need of
        specifying its length.
        """
        first_byte = self.read_byte()
        if first_byte == 254:
            length = self.read_byte() | (self.read_byte() << 8) | (
                self.read_byte() << 16)
            padding = length % 4
        else:
            length = first_byte
            padding = (length + 1) % 4

        data = self.read(length)
        if padding > 0:
            padding = 4 - padding
            self.read(padding)

        return data

    def tgread_string(self):
        """Reads a Telegram-encoded string."""
        return str(self.tgread_bytes(), encoding='utf-8', errors='replace')

    def tgread_bool(self):
        """Reads a Telegram boolean value."""
        value = self.read_int(signed=False)
        if value == 0x997275b5:  # boolTrue
            return True
        elif value == 0xbc799737:  # boolFalse
            return False
        else:
            raise RuntimeError('Invalid boolean code {}'.format(hex(value)))

    def tgread_date(self):
        """Reads and converts Unix time (used by Telegram)
           into a Python datetime object.
        """
        value = self.read_int()
        return _EPOCH + timedelta(seconds=value)

    def tgread_object(self):
        """Reads a Telegram object."""
        constructor_id = self.read_int(signed=False)
        clazz = tlobjects.get(constructor_id, None)
        if clazz is None:
            # The class was None, but there's still a
            # chance of it being a manually parsed value like bool!
            value = constructor_id
            if value == 0x997275b5:  # boolTrue
                return True
            elif value == 0xbc799737:  # boolFalse
                return False
            elif value == 0x1cb5c415:  # Vector
                return [self.tgread_object() for _ in range(self.read_int())]

            clazz = core_objects.get(constructor_id, None)
            if clazz is None:
                # If there was still no luck, give up
                self.seek(-4)  # Go back
                pos = self.tell_position()
                error = TypeNotFoundError(constructor_id, self.read())
                self.set_position(pos)
                raise error

        return clazz.from_reader(self)

    def tgread_vector(self):
        """Reads a vector (a list) of Telegram objects."""
        if 0x1cb5c415 != self.read_int(signed=False):
            raise RuntimeError('Invalid constructor code, vector was expected')

        count = self.read_int()
        return [self.tgread_object() for _ in range(count)]

    # endregion

    def close(self):
        """Closes the reader, freeing the BytesIO stream."""
        self.stream = b''

    # region Position related

    def tell_position(self):
        """Tells the current position on the stream."""
        return self.position

    def set_position(self, position):
        """Sets the current position on the stream."""
        self.position = position

    def seek(self, offset):
        """
        Seeks the stream position given an offset from the current position.
        The offset may be negative.
        """
        self.position += offset

    # endregion

    # region with block

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    # endregion