File: _proxy_factory.py

package info (click to toggle)
electrum 4.0.9-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 35,248 kB
  • sloc: python: 222,785; sh: 165; java: 73; javascript: 10; makefile: 9
file content (59 lines) | stat: -rw-r--r-- 1,749 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from typing import Type, Union, Dict, Any
from ._types import ProxyType
from ._proxy_sync import SyncProxy
from ._proxy_async import AsyncProxy
from ._helpers import parse_proxy_url


class ProxyFactory:
    types: Dict[ProxyType, Type[Any]]

    @classmethod
    def create(cls, proxy_type: ProxyType, host: str, port: int,
               username: str = None, password: str = None,
               rdns: bool = None, **kwargs) -> Union[SyncProxy, AsyncProxy]:

        proxy_cls = cls.types.get(proxy_type)

        if proxy_type == ProxyType.SOCKS4:
            return proxy_cls(
                proxy_host=host,
                proxy_port=port,
                user_id=username,
                rdns=rdns,
                **kwargs
            )

        if proxy_type == ProxyType.SOCKS5:
            return proxy_cls(
                proxy_host=host,
                proxy_port=port,
                username=username,
                password=password,
                rdns=rdns,
                **kwargs
            )

        if proxy_type == ProxyType.HTTP:
            return proxy_cls(
                proxy_host=host,
                proxy_port=port,
                username=username,
                password=password,
                **kwargs
            )

        raise ValueError('Invalid proxy type: %s'  # pragma: no cover
                         % proxy_type)

    @classmethod
    def from_url(cls, url: str, **kwargs) -> Union[SyncProxy, AsyncProxy]:
        proxy_type, host, port, username, password = parse_proxy_url(url)
        return cls.create(
            proxy_type=proxy_type,
            host=host,
            port=port,
            username=username,
            password=password,
            **kwargs
        )