File: utils.py

package info (click to toggle)
aiohttp-socks 0.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 164 kB
  • sloc: python: 1,016; makefile: 4
file content (102 lines) | stat: -rw-r--r-- 2,525 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import asyncio
import warnings

from python_socks import ProxyType, parse_proxy_url
from python_socks.async_.asyncio import Proxy


async def open_connection(
    proxy_url=None,
    host=None,
    port=None,
    *,
    proxy_type=ProxyType.SOCKS5,
    proxy_host='127.0.0.1',
    proxy_port=1080,
    username=None,
    password=None,
    rdns=True,
    loop=None,
    **kwargs,
):
    warnings.warn(
        'open_connection is deprecated. '
        'Use https://github.com/romis2012/python-socks directly instead.',
        DeprecationWarning,
        stacklevel=2,
    )

    if host is None or port is None:
        raise ValueError('host and port must be specified')  # pragma: no cover

    if loop is None:
        loop = asyncio.get_event_loop()

    if proxy_url is not None:
        proxy_type, proxy_host, proxy_port, username, password = parse_proxy_url(proxy_url)

    proxy = Proxy.create(
        proxy_type=proxy_type,
        host=proxy_host,
        port=proxy_port,
        username=username,
        password=password,
        rdns=rdns,
        loop=loop,
    )

    sock = await proxy.connect(host, port)

    # noinspection PyTypeChecker
    return await asyncio.open_connection(host=None, port=None, sock=sock, **kwargs)


async def create_connection(
    proxy_url=None,
    protocol_factory=None,
    host=None,
    port=None,
    *,
    proxy_type=ProxyType.SOCKS5,
    proxy_host='127.0.0.1',
    proxy_port=1080,
    username=None,
    password=None,
    rdns=True,
    loop=None,
    **kwargs,
):
    warnings.warn(
        'create_connection is deprecated. '
        'Use https://github.com/romis2012/python-socks directly instead.',
        DeprecationWarning,
        stacklevel=2,
    )

    if protocol_factory is None:
        raise ValueError('protocol_factory must be specified')  # pragma: no cover

    if host is None or port is None:
        raise ValueError('host and port must be specified')  # pragma: no cover

    if loop is None:
        loop = asyncio.get_event_loop()

    if proxy_url is not None:
        proxy_type, proxy_host, proxy_port, username, password = parse_proxy_url(proxy_url)

    proxy = Proxy.create(
        proxy_type=proxy_type,
        host=proxy_host,
        port=proxy_port,
        username=username,
        password=password,
        rdns=rdns,
        loop=loop,
    )

    sock = await proxy.connect(host, port)

    return await loop.create_connection(
        protocol_factory=protocol_factory, host=None, port=None, sock=sock, **kwargs
    )