File: connection.py

package info (click to toggle)
python-telethon 1.42.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,520 kB
  • sloc: python: 16,285; javascript: 200; makefile: 16; sh: 11
file content (440 lines) | stat: -rw-r--r-- 16,161 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
import abc
import asyncio
import socket
import sys

try:
    import ssl as ssl_mod
except ImportError:
    ssl_mod = None

try:
    import python_socks
except ImportError:
    python_socks = None

from ...errors import InvalidChecksumError, InvalidBufferError
from ... import helpers


class Connection(abc.ABC):
    """
    The `Connection` class is a wrapper around ``asyncio.open_connection``.

    Subclasses will implement different transport modes as atomic operations,
    which this class eases doing since the exposed interface simply puts and
    gets complete data payloads to and from queues.

    The only error that will raise from send and receive methods is
    ``ConnectionError``, which will raise when attempting to send if
    the client is disconnected (includes remote disconnections).
    """
    # this static attribute should be redefined by `Connection` subclasses and
    # should be one of `PacketCodec` implementations
    packet_codec = None

    def __init__(self, ip, port, dc_id, *, loggers, proxy=None, local_addr=None):
        self._ip = ip
        self._port = port
        self._dc_id = dc_id  # only for MTProxy, it's an abstraction leak
        self._log = loggers[__name__]
        self._proxy = proxy
        self._local_addr = local_addr
        self._reader = None
        self._writer = None
        self._connected = False
        self._send_task = None
        self._recv_task = None
        self._codec = None
        self._obfuscation = None  # TcpObfuscated and MTProxy
        self._send_queue = asyncio.Queue(1)
        self._recv_queue = asyncio.Queue(1)

    @staticmethod
    def _wrap_socket_ssl(sock):
        if ssl_mod is None:
            raise RuntimeError(
                'Cannot use proxy that requires SSL '
                'without the SSL module being available'
            )

        return ssl_mod.wrap_socket(
            sock,
            do_handshake_on_connect=True,
            ssl_version=ssl_mod.PROTOCOL_SSLv23,
            ciphers='ADH-AES256-SHA')

    @staticmethod
    def _parse_proxy(proxy_type, addr, port, rdns=True, username=None, password=None):
        if isinstance(proxy_type, str):
            proxy_type = proxy_type.lower()

        # Always prefer `python_socks` when available
        if python_socks:
            from python_socks import ProxyType

            # We do the check for numerical values here
            # to be backwards compatible with PySocks proxy format,
            # (since socks.SOCKS5 == 2, socks.SOCKS4 == 1, socks.HTTP == 3)
            if proxy_type == ProxyType.SOCKS5 or proxy_type == 2 or proxy_type == "socks5":
                protocol = ProxyType.SOCKS5
            elif proxy_type == ProxyType.SOCKS4 or proxy_type == 1 or proxy_type == "socks4":
                protocol = ProxyType.SOCKS4
            elif proxy_type == ProxyType.HTTP or proxy_type == 3 or proxy_type == "http":
                protocol = ProxyType.HTTP
            else:
                raise ValueError("Unknown proxy protocol type: {}".format(proxy_type))

            # This tuple must be compatible with `python_socks`' `Proxy.create()` signature
            return protocol, addr, port, username, password, rdns

        else:
            from socks import SOCKS5, SOCKS4, HTTP

            if proxy_type == 2 or proxy_type == "socks5":
                protocol = SOCKS5
            elif proxy_type == 1 or proxy_type == "socks4":
                protocol = SOCKS4
            elif proxy_type == 3 or proxy_type == "http":
                protocol = HTTP
            else:
                raise ValueError("Unknown proxy protocol type: {}".format(proxy_type))

            # This tuple must be compatible with `PySocks`' `socksocket.set_proxy()` signature
            return protocol, addr, port, rdns, username, password

    async def _proxy_connect(self, timeout=None, local_addr=None):
        if isinstance(self._proxy, (tuple, list)):
            parsed = self._parse_proxy(*self._proxy)
        elif isinstance(self._proxy, dict):
            parsed = self._parse_proxy(**self._proxy)
        else:
            raise TypeError("Proxy of unknown format: {}".format(type(self._proxy)))

        # Always prefer `python_socks` when available
        if python_socks:
            # python_socks internal errors are not inherited from
            # builtin IOError (just from Exception). Instead of adding those
            # in exceptions clauses everywhere through the code, we
            # rather monkey-patch them in place. Keep in mind that
            # ProxyError takes error_code as keyword argument.

            class ConnectionErrorExtra(ConnectionError):
                def __init__(self, message, error_code=None):
                    super().__init__(message)
                    self.error_code = error_code

            python_socks._errors.ProxyError = ConnectionErrorExtra
            python_socks._errors.ProxyConnectionError = ConnectionError
            python_socks._errors.ProxyTimeoutError = ConnectionError

            from python_socks.async_.asyncio import Proxy

            proxy = Proxy.create(*parsed)

            # WARNING: If `local_addr` is set we use manual socket creation, because,
            # unfortunately, `Proxy.connect()` does not expose `local_addr`
            # argument, so if we want to bind socket locally, we need to manually
            # create, bind and connect socket, and then pass to `Proxy.connect()` method.

            if local_addr is None:
                sock = await proxy.connect(
                    dest_host=self._ip,
                    dest_port=self._port,
                    timeout=timeout
                )
            else:
                # Here we start manual setup of the socket.
                # The `address` represents the proxy ip and proxy port,
                # not the destination one (!), because the socket
                # connects to the proxy server, not destination server.
                # IPv family is also checked on proxy address.
                if ':' in proxy.proxy_host:
                    mode, address = socket.AF_INET6, (proxy.proxy_host, proxy.proxy_port, 0, 0)
                else:
                    mode, address = socket.AF_INET, (proxy.proxy_host, proxy.proxy_port)

                # Create a non-blocking socket and bind it (if local address is specified).
                sock = socket.socket(mode, socket.SOCK_STREAM)
                sock.setblocking(False)
                sock.bind(local_addr)

                # Actual TCP connection is performed here.
                await asyncio.wait_for(
                    helpers.get_running_loop().sock_connect(sock=sock, address=address),
                    timeout=timeout
                )

                # As our socket is already created and connected,
                # this call sets the destination host/port and
                # starts protocol negotiations with the proxy server.
                sock = await proxy.connect(
                    dest_host=self._ip,
                    dest_port=self._port,
                    timeout=timeout,
                    _socket=sock
                )

        else:
            import socks

            # Here `address` represents destination address (not proxy), because of
            # the `PySocks` implementation of the connection routine.
            # IPv family is checked on proxy address, not destination address.
            if ':' in parsed[1]:
                mode, address = socket.AF_INET6, (self._ip, self._port, 0, 0)
            else:
                mode, address = socket.AF_INET, (self._ip, self._port)

            # Setup socket, proxy, timeout and bind it (if necessary).
            sock = socks.socksocket(mode, socket.SOCK_STREAM)
            sock.set_proxy(*parsed)
            sock.settimeout(timeout)

            if local_addr is not None:
                sock.bind(local_addr)

            # Actual TCP connection and negotiation performed here.
            await asyncio.wait_for(
                helpers.get_running_loop().sock_connect(sock=sock, address=address),
                timeout=timeout
            )

            sock.setblocking(False)

        return sock

    async def _connect(self, timeout=None, ssl=None):
        if self._local_addr is not None:
            # NOTE: If port is not specified, we use 0 port
            # to notify the OS that port should be chosen randomly
            # from the available ones.
            if isinstance(self._local_addr, tuple) and len(self._local_addr) == 2:
                local_addr = self._local_addr
            elif isinstance(self._local_addr, str):
                local_addr = (self._local_addr, 0)
            else:
                raise ValueError("Unknown local address format: {}".format(self._local_addr))
        else:
            local_addr = None

        if not self._proxy:
            self._reader, self._writer = await asyncio.wait_for(
                asyncio.open_connection(
                    host=self._ip,
                    port=self._port,
                    ssl=ssl,
                    local_addr=local_addr
                ), timeout=timeout)
        else:
            # Proxy setup, connection and negotiation is performed here.
            sock = await self._proxy_connect(
                timeout=timeout,
                local_addr=local_addr
            )

            # Wrap socket in SSL context (if provided)
            if ssl:
                sock = self._wrap_socket_ssl(sock)

            self._reader, self._writer = await asyncio.open_connection(sock=sock)

        self._codec = self.packet_codec(self)
        self._init_conn()
        await self._writer.drain()

    async def connect(self, timeout=None, ssl=None):
        """
        Establishes a connection with the server.
        """
        await self._connect(timeout=timeout, ssl=ssl)
        self._connected = True

        loop = helpers.get_running_loop()
        self._send_task = loop.create_task(self._send_loop())
        self._recv_task = loop.create_task(self._recv_loop())

    async def disconnect(self):
        """
        Disconnects from the server, and clears
        pending outgoing and incoming messages.
        """
        if not self._connected:
            return

        self._connected = False

        await helpers._cancel(
            self._log,
            send_task=self._send_task,
            recv_task=self._recv_task
        )

        if self._writer:
            self._writer.close()
            if sys.version_info >= (3, 7):
                try:
                    await asyncio.wait_for(self._writer.wait_closed(), timeout=10)
                except asyncio.TimeoutError:
                    # See issue #3917. For some users, this line was hanging indefinitely.
                    # The hard timeout is not ideal (connection won't be properly closed),
                    # but the code will at least be able to procceed.
                    self._log.warning('Graceful disconnection timed out, forcibly ignoring cleanup')
                except Exception as e:
                    # Disconnecting should never raise. Seen:
                    # * OSError: No route to host and
                    # * OSError: [Errno 32] Broken pipe
                    # * ConnectionResetError
                    self._log.info('%s during disconnect: %s', type(e), e)

    def send(self, data):
        """
        Sends a packet of data through this connection mode.

        This method returns a coroutine.
        """
        if not self._connected:
            raise ConnectionError('Not connected')

        return self._send_queue.put(data)

    async def recv(self):
        """
        Receives a packet of data through this connection mode.

        This method returns a coroutine.
        """
        while self._connected:
            result, err = await self._recv_queue.get()
            if err:
                raise err
            if result:
                return result

        raise ConnectionError('Not connected')

    async def _send_loop(self):
        """
        This loop is constantly popping items off the queue to send them.
        """
        try:
            while self._connected:
                self._send(await self._send_queue.get())
                await self._writer.drain()
        except asyncio.CancelledError:
            pass
        except Exception as e:
            if isinstance(e, IOError):
                self._log.info('The server closed the connection while sending')
            else:
                self._log.exception('Unexpected exception in the send loop')

            await self.disconnect()

    async def _recv_loop(self):
        """
        This loop is constantly putting items on the queue as they're read.
        """
        try:
            while self._connected:
                try:
                    data = await self._recv()
                except asyncio.CancelledError:
                    break
                except (IOError, asyncio.IncompleteReadError) as e:
                    self._log.warning('Server closed the connection: %s', e)
                    await self._recv_queue.put((None, e))
                    await self.disconnect()
                except InvalidChecksumError as e:
                    self._log.warning('Server response had invalid checksum: %s', e)
                    await self._recv_queue.put((None, e))
                except InvalidBufferError as e:
                    self._log.warning('Server response had invalid buffer: %s', e)
                    await self._recv_queue.put((None, e))
                except Exception as e:
                    self._log.exception('Unexpected exception in the receive loop')
                    await self._recv_queue.put((None, e))
                    await self.disconnect()
                else:
                    await self._recv_queue.put((data, None))
        finally:
            await self.disconnect()


    def _init_conn(self):
        """
        This method will be called after `connect` is called.
        After this method finishes, the writer will be drained.

        Subclasses should make use of this if they need to send
        data to Telegram to indicate which connection mode will
        be used.
        """
        if self._codec.tag:
            self._writer.write(self._codec.tag)

    def _send(self, data):
        self._writer.write(self._codec.encode_packet(data))

    async def _recv(self):
        return await self._codec.read_packet(self._reader)

    def __str__(self):
        return '{}:{}/{}'.format(
            self._ip, self._port,
            self.__class__.__name__.replace('Connection', '')
        )


class ObfuscatedConnection(Connection):
    """
    Base class for "obfuscated" connections ("obfuscated2", "mtproto proxy")
    """
    """
    This attribute should be redefined by subclasses
    """
    obfuscated_io = None

    def _init_conn(self):
        self._obfuscation = self.obfuscated_io(self)
        self._writer.write(self._obfuscation.header)

    def _send(self, data):
        self._obfuscation.write(self._codec.encode_packet(data))

    async def _recv(self):
        return await self._codec.read_packet(self._obfuscation)


class PacketCodec(abc.ABC):
    """
    Base class for packet codecs
    """

    """
    This attribute should be re-defined by subclass to define if some
    "magic bytes" should be sent to server right after connection is made to
    signal which protocol will be used
    """
    tag = None

    def __init__(self, connection):
        """
        Codec is created when connection is just made.
        """
        self._conn = connection

    @abc.abstractmethod
    def encode_packet(self, data):
        """
        Encodes single packet and returns encoded bytes.
        """
        raise NotImplementedError

    @abc.abstractmethod
    async def read_packet(self, reader):
        """
        Reads single packet from `reader` object that should have
        `readexactly(n)` method.
        """
        raise NotImplementedError