1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
|
# SPDX-License-Identifier: GPL-3.0-or-later
import itertools
import re
from collections.abc import Generator
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from urllib.parse import urlparse
from .download.url import URL
class TokenType(Enum):
MACHINE = "machine"
LOGIN = "login"
PASSWORD = "password"
@dataclass
class Token:
token: TokenType
value: str
@dataclass(frozen=True)
class Machine:
protocol: str | None
hostname: str | None
port: int | None
path: str | None
@classmethod
def from_string(cls, string: str):
if "//" not in string:
string = f"//{string}"
url = urlparse(string)
return cls(
protocol=url.scheme, hostname=url.hostname, port=url.port, path=url.path
)
class NetRCFile:
def __init__(self, path: Path) -> None:
self._path = path
def tokens(self) -> Generator[Token, None, None]:
if not self._path.is_file():
return
with open(self._path, "rt", encoding="utf-8") as fp:
for line in fp:
parts = (t for t in re.split(r"[ \t\n\r]", line) if t)
try:
while True:
try:
token = TokenType(next(parts))
except ValueError:
next(parts)
continue
yield Token(token, next(parts))
except (StopIteration, ValueError):
continue
class NetRC:
def __init__(self, netrc_path: Path = Path("/etc/apt/auth.conf")) -> None:
self._machines: dict[Machine, tuple[str, str]] = {}
self._reset_machine()
for file in itertools.chain(
[netrc_path],
netrc_path.with_name(f"{netrc_path.name}.d").glob("*.conf"),
):
self._process_file(file)
def _process_file(self, file: Path) -> None:
for token in NetRCFile(file).tokens():
match token.token:
case TokenType.MACHINE:
self._add_machine()
self._machine = Machine.from_string(token.value)
case TokenType.LOGIN:
self._login = token.value
case TokenType.PASSWORD:
self._password = token.value
self._add_machine()
def _add_machine(self) -> None:
if self._machine and self._login and self._password:
self._machines[self._machine] = (self._login, self._password)
self._reset_machine()
def _reset_machine(self) -> None:
self._machine: Machine | None = None
self._login: str | None = None
self._password: str | None = None
def match_machine(self, url: URL) -> tuple[str, str] | None:
for machine, auth in self._machines.items():
if machine.protocol != url.scheme and (
machine.protocol or url.scheme not in ("https", "tor+https")
):
continue
if machine.hostname != url.hostname:
continue
if machine.port and machine.port != url.port:
continue
if machine.path and not url.path.startswith(machine.path):
continue
return auth
return None
|