1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
import sys
from dataclasses import dataclass
import base64
import binascii
from collections import namedtuple
from typing import Optional
from .._version import __title__, __version__
from .errors import ReplyError
DEFAULT_USER_AGENT = 'Python/{0[0]}.{0[1]} {1}/{2}'.format(
sys.version_info,
__title__,
__version__,
)
CRLF = '\r\n'
class BasicAuth(namedtuple('BasicAuth', ['login', 'password', 'encoding'])):
"""Http basic authentication helper."""
def __new__(cls, login: str, password: str = '', encoding: str = 'latin1') -> 'BasicAuth':
if login is None:
raise ValueError('None is not allowed as login value')
if password is None:
raise ValueError('None is not allowed as password value')
if ':' in login:
raise ValueError('A ":" is not allowed in login (RFC 1945#section-11.1)')
# noinspection PyTypeChecker,PyArgumentList
return super().__new__(cls, login, password, encoding)
@classmethod
def decode(cls, auth_header: str, encoding: str = 'latin1') -> 'BasicAuth':
"""Create a BasicAuth object from an Authorization HTTP header."""
try:
auth_type, encoded_credentials = auth_header.split(' ', 1)
except ValueError:
raise ValueError('Could not parse authorization header.')
if auth_type.lower() != 'basic':
raise ValueError('Unknown authorization method %s' % auth_type)
try:
decoded = base64.b64decode(encoded_credentials.encode('ascii'), validate=True).decode(
encoding
)
except binascii.Error:
raise ValueError('Invalid base64 encoding.')
try:
# RFC 2617 HTTP Authentication
# https://www.ietf.org/rfc/rfc2617.txt
# the colon must be present, but the username and password may be
# otherwise blank.
username, password = decoded.split(':', 1)
except ValueError:
raise ValueError('Invalid credentials.')
# noinspection PyTypeChecker
return cls(username, password, encoding=encoding)
def encode(self) -> str:
"""Encode credentials."""
creds = ('%s:%s' % (self.login, self.password)).encode(self.encoding)
return 'Basic %s' % base64.b64encode(creds).decode(self.encoding)
class _Buffer:
def __init__(self, encoding: str = 'utf-8'):
self._encoding = encoding
self._buffer = bytearray()
def append_line(self, line: str = ""):
if line:
self._buffer.extend(line.encode(self._encoding))
self._buffer.extend(CRLF.encode('ascii'))
def dumps(self) -> bytes:
return bytes(self._buffer)
@dataclass
class ConnectRequest:
host: str
port: int
username: Optional[str]
password: Optional[str]
def dumps(self) -> bytes:
buff = _Buffer()
buff.append_line(f'CONNECT {self.host}:{self.port} HTTP/1.1')
buff.append_line(f'Host: {self.host}:{self.port}')
buff.append_line(f'User-Agent: {DEFAULT_USER_AGENT}')
if self.username and self.password:
auth = BasicAuth(self.username, self.password)
buff.append_line(f'Proxy-Authorization: {auth.encode()}')
buff.append_line()
return buff.dumps()
@dataclass
class ConnectReply:
status_code: int
message: str
@classmethod
def loads(cls, data: bytes) -> 'ConnectReply':
if not data:
raise ReplyError('Invalid proxy response') # pragma: no cover
line = data.split(CRLF.encode('ascii'), 1)[0]
line = line.decode('utf-8', 'surrogateescape')
try:
version, code, *reason = line.split()
except ValueError: # pragma: no cover
raise ReplyError(f'Invalid status line: {line}')
try:
status_code = int(code)
except ValueError: # pragma: no cover
raise ReplyError(f'Invalid status code: {code}')
status_message = " ".join(reason)
if status_code != 200:
msg = f'{status_code} {status_message}'
raise ReplyError(msg, error_code=status_code)
return cls(status_code=status_code, message=status_message)
# noinspection PyMethodMayBeStatic
class Connection:
def send(self, request: ConnectRequest) -> bytes:
return request.dumps()
def receive(self, data: bytes) -> ConnectReply:
return ConnectReply.loads(data)
|