File: _proto_socks4.py

package info (click to toggle)
electrum 4.0.9-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 35,248 kB
  • sloc: python: 222,785; sh: 165; java: 73; javascript: 10; makefile: 9
file content (98 lines) | stat: -rw-r--r-- 2,798 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import enum
import ipaddress

from ._helpers import is_ipv4_address
from ._errors import ProxyError

RSV = NULL = 0x00
SOCKS_VER = 0x04


class Command(enum.IntEnum):
    CONNECT = 0x01
    BIND = 0x02


class ReplyCode(enum.IntEnum):
    REQUEST_GRANTED = 0x5A
    REQUEST_REJECTED_OR_FAILED = 0x5B
    CONNECTION_FAILED = 0x5C
    AUTHENTICATION_FAILED = 0x5D


ReplyMessages = {
    ReplyCode.REQUEST_GRANTED: 'Request granted',
    ReplyCode.REQUEST_REJECTED_OR_FAILED: 'Request rejected or failed',
    ReplyCode.CONNECTION_FAILED: 'Request rejected because SOCKS server '
                                 'cannot connect to identd on the client',
    ReplyCode.AUTHENTICATION_FAILED: 'Request rejected because '
                                     'the client program '
                                     'and identd report different user-ids'
}


class ConnectRequest:
    def __init__(self, host: str, port: int, user_id: str, rdns: bool):
        self.host = host
        self.port = port
        self.user_id = user_id
        self.rdns = rdns
        self._resolved_host = None

    @property
    def need_resolve(self):
        return not is_ipv4_address(self.host) and not self.rdns

    def set_resolved_host(self, value):
        self._resolved_host = value

    def __bytes__(self):
        port_bytes = self.port.to_bytes(2, 'big')
        include_hostname = False

        if is_ipv4_address(self.host):
            host_bytes = ipaddress.IPv4Address(self.host).packed
        else:
            # not IP address, probably a DNS name
            if self.rdns:
                # remote resolve (SOCKS4a)
                include_hostname = True
                host_bytes = bytes([NULL, NULL, NULL, 1])
            else:
                # resolve locally
                assert self._resolved_host is not None
                addr = self._resolved_host
                host_bytes = ipaddress.IPv4Address(addr).packed

        data = bytearray([SOCKS_VER, Command.CONNECT])
        data += port_bytes
        data += host_bytes

        if self.user_id:
            data += self.user_id.encode('ascii')

        data.append(NULL)

        if include_hostname:
            data += self.host.encode('idna')
            data.append(NULL)

        return bytes(data)


class ConnectResponse:
    rsv: int
    reply: ReplyCode

    def __init__(self, data: bytes):
        assert len(data) == 2
        self.rsv = data[0]
        self.reply = data[1]  # noqa

    def validate(self):
        if self.rsv != RSV:  # pragma: no cover
            raise ProxyError('SOCKS4 proxy server sent invalid data')

        if self.reply != ReplyCode.REQUEST_GRANTED:  # pragma: no cover
            msg = ReplyMessages.get(self.reply, 'Unknown error')
            raise ProxyError(msg, self.reply)