File: _proto_socks5.py

package info (click to toggle)
electrum 4.0.9-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 35,248 kB
  • sloc: python: 222,785; sh: 165; java: 73; javascript: 10; makefile: 9
file content (203 lines) | stat: -rw-r--r-- 6,061 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import enum
import ipaddress
import typing
from ._helpers import is_ip_address
from ._errors import ProxyError

RSV = NULL = 0x00
SOCKS_VER = 0x05


class AuthMethod(enum.IntEnum):
    ANONYMOUS = 0x00
    GSSAPI = 0x01
    USERNAME_PASSWORD = 0x02
    NO_ACCEPTABLE = 0xff


class AddressType(enum.IntEnum):
    IPV4 = 0x01
    DOMAIN = 0x03
    IPV6 = 0x04

    @classmethod
    def from_ip_ver(cls, ver: int):
        if ver == 4:
            return cls.IPV4
        if ver == 6:
            return cls.IPV6

        raise ValueError('Invalid IP version')


class Command(enum.IntEnum):
    CONNECT = 0x01
    BIND = 0x02
    UDP_ASSOCIATE = 0x03


class ReplyCode(enum.IntEnum):
    GRANTED = 0x00
    GENERAL_FAILURE = 0x01
    CONNECTION_NOT_ALLOWED = 0x02
    NETWORK_UNREACHABLE = 0x03
    HOST_UNREACHABLE = 0x04
    CONNECTION_REFUSED = 0x05
    TTL_EXPIRED = 0x06
    COMMAND_NOT_SUPPORTED = 0x07
    ADDRESS_TYPE_NOT_SUPPORTED = 0x08


ReplyMessages = {
    ReplyCode.GRANTED: 'Request granted',
    ReplyCode.GENERAL_FAILURE: 'General SOCKS server failure',
    ReplyCode.CONNECTION_NOT_ALLOWED: 'Connection not allowed by ruleset',
    ReplyCode.NETWORK_UNREACHABLE: 'Network unreachable',
    ReplyCode.HOST_UNREACHABLE: 'Host unreachable',
    ReplyCode.CONNECTION_REFUSED: 'Connection refused by destination host',
    ReplyCode.TTL_EXPIRED: 'TTL expired',
    ReplyCode.COMMAND_NOT_SUPPORTED: 'Command not supported or protocol error',
    ReplyCode.ADDRESS_TYPE_NOT_SUPPORTED: 'Address type not supported'
}


class AuthMethodsRequest:
    def __init__(self, username: str, password: str):
        auth_methods = bytearray([AuthMethod.ANONYMOUS])

        if username and password:
            auth_methods.append(AuthMethod.USERNAME_PASSWORD)

        self.auth_methods = auth_methods

    def __bytes__(self):
        return bytes([SOCKS_VER, len(self.auth_methods)]) + self.auth_methods


class AuthMethodsResponse:
    socks_ver: int
    auth_method: AuthMethod

    def __init__(self, data: bytes):
        assert len(data) == 2
        self.socks_ver = data[0]
        self.auth_method = data[1]  # noqa

    def validate(self, request: AuthMethodsRequest):
        if self.socks_ver != SOCKS_VER:
            raise ProxyError('Unexpected '  # pragma: no cover
                             'SOCKS version number: '
                             '{}'.format(self.socks_ver))

        if self.auth_method == AuthMethod.NO_ACCEPTABLE:
            raise ProxyError('No acceptable '  # pragma: no cover
                             'authentication methods were offered')

        if self.auth_method not in request.auth_methods:
            raise ProxyError('Unexpected SOCKS '  # pragma: no cover
                             'authentication method: '
                             '{}'.format(self.auth_method))


class AuthRequest(typing.SupportsBytes):
    VER = 0x01

    def __init__(self, username: str, password: str):
        self.username = username
        self.password = password

    def __bytes__(self):
        data = bytearray()
        data.append(self.VER)
        data.append(len(self.username))
        data += self.username.encode('ascii')
        data.append(len(self.password))
        data += self.password.encode('ascii')
        return bytes(data)


class AuthResponse:
    ver: int
    reply: ReplyCode

    def __init__(self, data: bytes):
        assert len(data) == 2
        self.ver = data[0]
        self.reply = data[1]  # noqa

    def validate(self):
        if self.ver != AuthRequest.VER:
            raise ProxyError('Invalid '  # pragma: no cover
                             'authentication response')

        if self.reply != ReplyCode.GRANTED:
            raise ProxyError('Username and password '  # pragma: no cover
                             'authentication failure')


class ConnectRequest:
    def __init__(self, host: str, port: int, rdns: bool):
        self.host = host
        self.port = port
        self.rdns = rdns
        self._resolved_host = None

    def __bytes__(self):
        data = bytearray([SOCKS_VER, Command.CONNECT, RSV])
        data += self._build_addr_request()
        return bytes(data)

    @property
    def need_resolve(self):
        return not is_ip_address(self.host) and not self.rdns

    def set_resolved_host(self, value):
        self._resolved_host = value

    def _build_addr_request(self) -> bytes:
        port = self.port.to_bytes(2, 'big')

        # destination address provided is an IPv4 or IPv6 address
        if is_ip_address(self.host):
            ip = ipaddress.ip_address(self.host)
            address_type = AddressType.from_ip_ver(ip.version)
            return bytes([address_type]) + ip.packed + port

        # not IP address, probably a DNS name
        if self.rdns:
            # resolve remotely
            address_type = AddressType.DOMAIN
            host = self.host.encode('idna')
            host_len = len(host)
            return bytes([address_type, host_len]) + host + port
        else:
            assert self._resolved_host is not None
            addr = self._resolved_host
            ip = ipaddress.ip_address(addr)
            address_type = AddressType.from_ip_ver(ip.version)
            return bytes([address_type]) + ip.packed + port


class ConnectResponse:
    socks_ver: int
    reply: ReplyCode
    rsv: int

    def __init__(self, data: bytes):
        assert len(data) == 3
        self.socks_ver = data[0]
        self.reply = data[1]  # noqa
        self.rsv = data[2]

    def validate(self):
        if self.socks_ver != SOCKS_VER:
            raise ProxyError('Unexpected SOCKS '  # pragma: no cover
                             'version number: {:#02X}'.format(self.socks_ver))

        if self.reply != ReplyCode.GRANTED:
            msg = ReplyMessages.get(self.reply, 'Unknown error')
            raise ProxyError(msg, self.reply)

        if self.rsv != RSV:
            raise ProxyError('The reserved byte '  # pragma: no cover
                             'must be {:#02X}'.format(RSV))