import abc
import argparse
import base64
import binascii
import hashlib
import json
import os
import os.path
import re
import shlex
import stat
import uuid
from pathlib import Path
from typing import ClassVar, Any, TYPE_CHECKING, Literal
from collections import OrderedDict
from datetime import datetime, timezone
from functools import partial
from string import Formatter

from ..logger import create_logger

logger = create_logger()

from .errors import Error
from .fs import get_keys_dir, make_path_safe
from .msgpack import Timestamp
from .time import OutputTimestamp, format_time, safe_timestamp
from .. import __version__ as borg_version
from .. import __version_tuple__ as borg_version_tuple
from ..constants import *  # NOQA

if TYPE_CHECKING:
    from ..item import ItemDiff


def bin_to_hex(binary):
    return binascii.hexlify(binary).decode("ascii")


def hex_to_bin(hex, length=None):
    try:
        binary = binascii.unhexlify(hex)
        binary_len = len(binary)
        if length is not None and binary_len != length:
            raise ValueError(f"Expected {length} bytes ({2 * length} hex digits), got {binary_len} bytes.")
    except binascii.Error as e:
        raise ValueError(str(e)) from None
    return binary


def safe_decode(s, coding="utf-8", errors="surrogateescape"):
    """Decode bytes to str, with round-tripping of "invalid" bytes."""
    if s is None:
        return None
    return s.decode(coding, errors)


def safe_encode(s, coding="utf-8", errors="surrogateescape"):
    """Encode str to bytes, with round-tripping of "invalid" bytes."""
    if s is None:
        return None
    return s.encode(coding, errors)


def remove_surrogates(s, errors="replace"):
    """Replace surrogates generated by fsdecode with '?'."""
    return s.encode("utf-8", errors).decode("utf-8")


def binary_to_json(key, value):
    assert isinstance(key, str)
    assert isinstance(value, bytes)
    return {key + "_b64": base64.b64encode(value).decode("ascii")}


def text_to_json(key, value):
    """
    Return a dict made from key/value that can be fed safely into a JSON encoder.

    JSON can only contain pure, valid unicode (but not: unicode with surrogate escapes).

    But sometimes we have to deal with such values and we do it like this:
    - <key>: value as pure unicode text (surrogate escapes, if any, replaced by ?)
    - <key>_b64: value as base64 encoded binary representation (only set if value has surrogate-escapes)
    """
    coding = "utf-8"
    assert isinstance(key, str)
    assert isinstance(value, str)  # str might contain surrogate escapes
    data = {}
    try:
        value.encode(coding, errors="strict")  # check if pure unicode
    except UnicodeEncodeError:
        # value has surrogate escape sequences
        data[key] = remove_surrogates(value)
        value_bytes = value.encode(coding, errors="surrogateescape")
        data |= binary_to_json(key, value_bytes)
    else:
        # value is pure unicode
        data[key] = value
        # we do not give the b64 representation, not needed
    return data


def join_cmd(argv, rs=False):
    cmd = shlex.join(argv)
    return remove_surrogates(cmd) if rs else cmd


def eval_escapes(s):
    """Evaluate literal escape sequences in a string (e.g. `\\n` -> `\n`)."""
    return s.encode("ascii", "backslashreplace").decode("unicode-escape")


def decode_dict(d, keys, encoding="utf-8", errors="surrogateescape"):
    for key in keys:
        if isinstance(d.get(key), bytes):
            d[key] = d[key].decode(encoding, errors)
    return d


def positive_int_validator(value):
    """argparse type for positive integers."""
    int_value = int(value)
    if int_value <= 0:
        raise argparse.ArgumentTypeError("A positive integer is required: %s" % value)
    return int_value


def interval(s):
    """Convert a string representing a valid interval to a number of seconds."""
    seconds_in_a_minute = 60
    seconds_in_an_hour = 60 * seconds_in_a_minute
    seconds_in_a_day = 24 * seconds_in_an_hour
    seconds_in_a_week = 7 * seconds_in_a_day
    seconds_in_a_month = 31 * seconds_in_a_day
    seconds_in_a_year = 365 * seconds_in_a_day
    multiplier = dict(
        y=seconds_in_a_year,
        m=seconds_in_a_month,
        w=seconds_in_a_week,
        d=seconds_in_a_day,
        H=seconds_in_an_hour,
        M=seconds_in_a_minute,
        S=1,
    )

    if s.endswith(tuple(multiplier.keys())):
        number = s[:-1]
        suffix = s[-1]
    else:
        raise argparse.ArgumentTypeError(f'Unexpected time unit "{s[-1]}": choose from {", ".join(multiplier)}')

    try:
        seconds = int(number) * multiplier[suffix]
    except ValueError:
        seconds = -1

    if seconds <= 0:
        raise argparse.ArgumentTypeError(f'Invalid number "{number}": expected positive integer')

    return seconds


def ChunkerParams(s):
    params = s.strip().split(",")
    count = len(params)
    if count == 0:
        raise argparse.ArgumentTypeError("no chunker params given")
    algo = params[0].lower()
    if algo == CH_FAIL and count == 3:
        block_size = int(params[1])
        fail_map = str(params[2])
        return algo, block_size, fail_map
    if algo == CH_FIXED and 2 <= count <= 3:  # fixed, block_size[, header_size]
        block_size = int(params[1])
        header_size = int(params[2]) if count == 3 else 0
        if block_size < 64:
            # we are only disallowing the most extreme cases of abuse here - this does NOT imply
            # that cutting chunks of the minimum allowed size is efficient concerning storage
            # or in-memory chunk management.
            # choose the block (chunk) size wisely: if you have a lot of data and you cut
            # it into very small chunks, you are asking for trouble!
            raise argparse.ArgumentTypeError("block_size must not be less than 64 Bytes")
        if block_size > MAX_DATA_SIZE or header_size > MAX_DATA_SIZE:
            raise argparse.ArgumentTypeError(
                "block_size and header_size must not exceed MAX_DATA_SIZE [%d]" % MAX_DATA_SIZE
            )
        return algo, block_size, header_size
    if algo == "default" and count == 1:  # default
        return CHUNKER_PARAMS
    if algo == CH_BUZHASH64 and count == 5:  # buzhash64, chunk_min, chunk_max, chunk_mask, window_size
        chunk_min, chunk_max, chunk_mask, window_size = (int(p) for p in params[1:])
        if not (chunk_min <= chunk_mask <= chunk_max):
            raise argparse.ArgumentTypeError("required: chunk_min <= chunk_mask <= chunk_max")
        if chunk_min < 6:
            # see comment in 'fixed' algo check
            raise argparse.ArgumentTypeError(
                "min. chunk size exponent must not be less than 6 (2^6 = 64B min. chunk size)"
            )
        if chunk_max > 23:
            raise argparse.ArgumentTypeError(
                "max. chunk size exponent must not be more than 23 (2^23 = 8MiB max. chunk size)"
            )
        # note that for buzhash64, there is no problem with even window_size.
        return CH_BUZHASH64, chunk_min, chunk_max, chunk_mask, window_size
    # this must stay last as it deals with old-style compat mode (no algorithm, 4 params, buzhash):
    if algo == CH_BUZHASH and count == 5 or count == 4:  # [buzhash, ]chunk_min, chunk_max, chunk_mask, window_size
        chunk_min, chunk_max, chunk_mask, window_size = (int(p) for p in params[count - 4 :])
        if not (chunk_min <= chunk_mask <= chunk_max):
            raise argparse.ArgumentTypeError("required: chunk_min <= chunk_mask <= chunk_max")
        if chunk_min < 6:
            # see comment in 'fixed' algo check
            raise argparse.ArgumentTypeError(
                "min. chunk size exponent must not be less than 6 (2^6 = 64B min. chunk size)"
            )
        if chunk_max > 23:
            raise argparse.ArgumentTypeError(
                "max. chunk size exponent must not be more than 23 (2^23 = 8MiB max. chunk size)"
            )
        if window_size % 2 == 0:
            raise argparse.ArgumentTypeError("window_size must be an uneven (odd) number")
        return CH_BUZHASH, chunk_min, chunk_max, chunk_mask, window_size
    raise argparse.ArgumentTypeError("invalid chunker params")


def FilesCacheMode(s):
    ENTRIES_MAP = dict(ctime="c", mtime="m", size="s", inode="i", rechunk="r", disabled="d")
    VALID_MODES = ("cis", "ims", "cs", "ms", "cr", "mr", "d", "s")  # letters in alpha order
    entries = set(s.strip().split(","))
    if not entries <= set(ENTRIES_MAP):
        raise argparse.ArgumentTypeError(
            "cache mode must be a comma-separated list of: %s" % ",".join(sorted(ENTRIES_MAP))
        )
    short_entries = {ENTRIES_MAP[entry] for entry in entries}
    mode = "".join(sorted(short_entries))
    if mode not in VALID_MODES:
        raise argparse.ArgumentTypeError("cache mode short must be one of: %s" % ",".join(VALID_MODES))
    return mode


def partial_format(format, mapping):
    """
    Apply format.format_map(mapping) while preserving unknown keys

    Does not support attribute access, indexing and ![rsa] conversions
    """
    for key, value in mapping.items():
        key = re.escape(key)
        format = re.sub(
            rf"(?<!\{{)((\{{{key}\}})|(\{{{key}:[^\}}]*\}}))", lambda match: match.group(1).format_map(mapping), format
        )
    return format


class DatetimeWrapper:
    def __init__(self, dt):
        self.dt = dt

    def __format__(self, format_spec):
        if format_spec == "":
            format_spec = ISO_FORMAT_NO_USECS
        return self.dt.__format__(format_spec)


class PlaceholderError(Error):
    """Formatting Error: "{}".format({}): {}({})"""

    exit_mcode = 5


class InvalidPlaceholder(PlaceholderError):
    """Invalid placeholder "{}" in string: {}"""

    exit_mcode = 6


def format_line(format, data):
    for _, key, _, conversion in Formatter().parse(format):
        if not key:
            continue
        if conversion or key not in data:
            raise InvalidPlaceholder(key, format)
    try:
        return format.format_map(data)
    except Exception as e:
        raise PlaceholderError(format, data, e.__class__.__name__, str(e))


def _replace_placeholders(text, overrides={}):
    """Replace placeholders in text with their values."""
    from ..platform import fqdn, hostname, getosusername

    current_time = datetime.now(timezone.utc)
    data = {
        "pid": os.getpid(),
        "fqdn": fqdn,
        "reverse-fqdn": ".".join(reversed(fqdn.split("."))),
        "hostname": hostname,
        "now": DatetimeWrapper(current_time.astimezone()),
        "utcnow": DatetimeWrapper(current_time),
        "unixtime": int(current_time.timestamp()),
        "user": getosusername(),
        "uuid4": str(uuid.uuid4()),
        "borgversion": borg_version,
        "borgmajor": "%d" % borg_version_tuple[:1],
        "borgminor": "%d.%d" % borg_version_tuple[:2],
        "borgpatch": "%d.%d.%d" % borg_version_tuple[:3],
        **overrides,
    }
    return format_line(text, data)


class PlaceholderReplacer:
    def __init__(self):
        self.reset()

    def override(self, key, value):
        self.overrides[key] = value

    def reset(self):
        self.overrides = {}

    def __call__(self, text, overrides=None):
        ovr = self.overrides | (overrides or {})
        return _replace_placeholders(text, overrides=ovr)


replace_placeholders = PlaceholderReplacer()


def PathSpec(text):
    if not text:
        raise argparse.ArgumentTypeError("Empty strings are not accepted as paths.")
    return text


def SortBySpec(text):
    from ..manifest import AI_HUMAN_SORT_KEYS

    for token in text.split(","):
        if token not in AI_HUMAN_SORT_KEYS:
            raise argparse.ArgumentTypeError("Invalid sort key: %s" % token)
    return text.replace("timestamp", "ts").replace("archive", "name")


def format_file_size(v, precision=2, sign=False, iec=False):
    """Format file size into a human friendly format"""
    fn = sizeof_fmt_iec if iec else sizeof_fmt_decimal
    return fn(v, suffix="B", sep=" ", precision=precision, sign=sign)


class FileSize(int):
    def __new__(cls, value, iec=False):
        obj = int.__new__(cls, value)
        obj.iec = iec
        return obj

    def __format__(self, format_spec):
        return format_file_size(int(self), iec=self.iec).__format__(format_spec)


def parse_file_size(s):
    """Return int from file size (1234, 55G, 1.7T)."""
    if not s:
        return int(s)  # will raise
    s = s.upper()
    suffix = s[-1]
    power = 1000
    try:
        factor = {"K": power, "M": power**2, "G": power**3, "T": power**4, "P": power**5}[suffix]
        s = s[:-1]
    except KeyError:
        factor = 1
    return int(float(s) * factor)


def sizeof_fmt(num, suffix="B", units=None, power=None, sep="", precision=2, sign=False):
    sign = "+" if sign and num > 0 else ""
    fmt = "{0:{1}.{2}f}{3}{4}{5}"
    prec = 0
    for unit in units[:-1]:
        if abs(round(num, precision)) < power:
            break
        num /= float(power)
        prec = precision
    else:
        unit = units[-1]
    return fmt.format(num, sign, prec, sep, unit, suffix)


def sizeof_fmt_iec(num, suffix="B", sep="", precision=2, sign=False):
    return sizeof_fmt(
        num,
        suffix=suffix,
        sep=sep,
        precision=precision,
        sign=sign,
        units=["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi", "Yi"],
        power=1024,
    )


def sizeof_fmt_decimal(num, suffix="B", sep="", precision=2, sign=False):
    return sizeof_fmt(
        num,
        suffix=suffix,
        sep=sep,
        precision=precision,
        sign=sign,
        units=["", "k", "M", "G", "T", "P", "E", "Z", "Y"],
        power=1000,
    )


def format_archive(archive):
    return "%-36s %s [%s]" % (archive.name, format_time(archive.ts), bin_to_hex(archive.id))


def parse_stringified_list(s):
    items = re.split(" *, *", s)
    return [item for item in items if item != ""]


class Location:
    """Object representing a repository location"""

    # user@ (optional)
    # user must not contain "@", ":" or "/".
    # Quoting adduser error message:
    # "To avoid problems, the username should consist only of letters, digits,
    # underscores, periods, at signs and dashes, and not start with a dash
    # (as defined by IEEE Std 1003.1-2001)."
    # We use "@" as separator between username and hostname, so we must
    # disallow it within the pure username part.
    optional_user_re = r"(?:(?P<user>[^@:/]+)@)?"

    # host NAME, or host IP ADDRESS (v4 or v6, v6 must be in square brackets)
    host_re = r"""
        (?P<host>(
            (?!\[)[^:/]+(?<!\])     # hostname or v4 addr, not containing : or / (does not match v6 addr: no brackets!)
            |
            \[[0-9a-fA-F:.]+\])     # ipv6 address in brackets
        )
    """

    # :port (optional)
    optional_port_re = r"(?::(?P<port>\d+))?"

    # path may contain any chars. to avoid ambiguities with other regexes,
    # it must not start with "//" nor with "scheme://" nor with "rclone:".
    local_path_re = r"""
        (?!(//|(ssh|socket|sftp|file)://|(rclone|s3|b2):))
        (?P<path>.+)
    """

    # abs_path must start with a slash.
    abs_path_re = r"(?P<path>/.+)"

    # path may or may not start with a slash.
    abs_or_rel_path_re = r"(?P<path>.+)"

    # regexes for misc. kinds of supported location specifiers:
    ssh_or_sftp_re = re.compile(
        r"(?P<proto>(ssh|sftp))://"
        + optional_user_re
        + host_re
        + optional_port_re
        + r"/"  # this is the separator, not part of the path!
        + abs_or_rel_path_re,
        re.VERBOSE,
    )

    # (s3|b2):[profile|(access_key_id:access_key_secret)@][schema://hostname[:port]]/bucket/path
    s3_re = re.compile(
        r"""
        (?P<s3type>(s3|b2)):
        ((
            (?P<profile>[^@:]+)  # profile (no colons allowed)
            |
            (?P<access_key_id>[^:@]+):(?P<access_key_secret>[^@]+)  # access key and secret
        )@)?  # optional authentication
        (
            [^:/]+://  # scheme
            (?P<hostname>[^:/]+)
            (:(?P<port>\d+))?
        )?  # optional endpoint
        /
        (?P<bucket>[^/]+)/  # bucket name
        (?P<path>.+)  # path
    """,
        re.VERBOSE,
    )

    rclone_re = re.compile(r"(?P<proto>rclone):(?P<path>(.*))", re.VERBOSE)

    file_or_socket_re = re.compile(r"(?P<proto>(file|socket))://" + abs_path_re, re.VERBOSE)

    local_re = re.compile(local_path_re, re.VERBOSE)

    def __init__(self, text="", overrides={}, other=False):
        self.repo_env_var = "BORG_OTHER_REPO" if other else "BORG_REPO"
        self.valid = False
        self.proto = None
        self.user = None
        self._pass = None
        self._host = None
        self.port = None
        self.path = None
        self.raw = None
        self.processed = None
        self.parse(text, overrides)

    def parse(self, text, overrides={}):
        if not text:
            # we did not get a text to parse, so we try to fetch from the environment
            text = os.environ.get(self.repo_env_var)
            if not text:  # None or ""
                return

        self.raw = text  # as given by user, might contain placeholders
        self.processed = replace_placeholders(self.raw, overrides)  # after placeholder replacement
        valid = self._parse(self.processed)
        if valid:
            self.valid = True
        else:
            raise ValueError('Invalid location format: "%s"' % self.processed)

    def _parse(self, text):
        m = self.ssh_or_sftp_re.match(text)
        if m:
            self.proto = m.group("proto")
            self.user = m.group("user")
            self._host = m.group("host")
            self.port = m.group("port") and int(m.group("port")) or None
            self.path = os.path.normpath(m.group("path"))
            return True
        m = self.rclone_re.match(text)
        if m:
            self.proto = m.group("proto")
            self.path = m.group("path")
            return True
        m = self.file_or_socket_re.match(text)
        if m:
            self.proto = m.group("proto")
            self.path = os.path.normpath(m.group("path"))
            return True
        m = self.s3_re.match(text)
        if m:
            self.proto = m.group("s3type")
            self.user = m.group("profile") if m.group("profile") else m.group("access_key_id")
            self._pass = True if m.group("access_key_secret") else False
            self._host = m.group("hostname")
            self.port = m.group("port") and int(m.group("port")) or None
            self.path = m.group("bucket") + "/" + m.group("path")
            return True
        m = self.local_re.match(text)
        if m:
            self.proto = "file"
            self.path = os.path.abspath(os.path.normpath(m.group("path")))
            return True
        return False

    def __str__(self):
        items = [
            "proto=%r" % self.proto,
            "user=%r" % self.user,
            "pass=%r" % ("REDACTED" if self._pass else None),
            "host=%r" % self.host,
            "port=%r" % self.port,
            "path=%r" % self.path,
        ]
        return ", ".join(items)

    def to_key_filename(self):
        name = re.sub(r"[^\w]", "_", self.path.rstrip("/"))
        if self.proto not in ("file", "socket", "rclone"):
            name = re.sub(r"[^\w]", "_", self.host) + "__" + name
        if len(name) > 120:
            # Limit file names to some reasonable length. Most file systems
            # limit them to 255 [unit of choice]; due to variations in unicode
            # handling we truncate to 100 *characters*.
            name = name[:120]
        return os.path.join(get_keys_dir(), name)

    def __repr__(self):
        return "Location(%s)" % self

    @property
    def host(self):
        # strip square brackets used for IPv6 addrs
        if self._host is not None:
            return self._host.lstrip("[").rstrip("]")

    def canonical_path(self):
        if self.proto in ("file", "socket"):
            return self.path
        if self.proto == "rclone":
            return f"{self.proto}:{self.path}"
        if self.proto in ("sftp", "ssh", "s3", "b2"):
            return (
                f"{self.proto}://"
                f"{(self.user + '@') if self.user else ''}"
                f"{self._host if self._host else ''}"
                f"{self.port if self.port else ''}/"
                f"{self.path}"
            )
        raise NotImplementedError(self.proto)

    def with_timestamp(self, timestamp):
        # note: this only affects the repository URL/path, not the archive name!
        return Location(
            self.raw,
            overrides={
                "now": DatetimeWrapper(timestamp),
                "utcnow": DatetimeWrapper(timestamp.astimezone(timezone.utc)),
            },
        )


def location_validator(proto=None, other=False):
    def validator(text):
        try:
            loc = Location(text, other=other)
        except ValueError as err:
            raise argparse.ArgumentTypeError(str(err)) from None
        if proto is not None and loc.proto != proto:
            if proto == "file":
                raise argparse.ArgumentTypeError('"%s": Repository must be local' % text)
            else:
                raise argparse.ArgumentTypeError('"%s": Repository must be remote' % text)
        return loc

    return validator


def relative_time_marker_validator(text: str):
    time_marker_regex = r"^\d+[ymwdHMS]$"
    match = re.compile(time_marker_regex).search(text)
    if not match:
        raise argparse.ArgumentTypeError(f"Invalid relative time marker used: {text}, choose from y, m, w, d, H, M, S")
    else:
        return text


def text_validator(*, name, max_length, min_length=0, invalid_ctrl_chars="\0", invalid_chars="", no_blanks=False):
    def validator(text):
        assert isinstance(text, str)
        if len(text) < min_length:
            raise argparse.ArgumentTypeError(f'Invalid {name}: "{text}" [length < {min_length}]')
        if len(text) > max_length:
            raise argparse.ArgumentTypeError(f'Invalid {name}: "{text}" [length > {max_length}]')
        if invalid_ctrl_chars and re.search(f"[{re.escape(invalid_ctrl_chars)}]", text):
            raise argparse.ArgumentTypeError(f'Invalid {name}: "{text}" [invalid control chars detected]')
        if invalid_chars and re.search(f"[{re.escape(invalid_chars)}]", text):
            raise argparse.ArgumentTypeError(
                f'Invalid {name}: "{text}" [invalid chars detected matching "{invalid_chars}"]'
            )
        if no_blanks and (text.startswith(" ") or text.endswith(" ")):
            raise argparse.ArgumentTypeError(f'Invalid {name}: "{text}" [leading or trailing blanks detected]')
        try:
            text.encode("utf-8", errors="strict")
        except UnicodeEncodeError:
            # looks like text contains surrogate-escapes
            raise argparse.ArgumentTypeError(f'Invalid {name}: "{text}" [contains non-unicode characters]')
        return text

    return validator


comment_validator = text_validator(name="comment", max_length=10000)
tag_validator = text_validator(name="tag", min_length=0, max_length=10, invalid_chars=" ,$")


def archivename_validator(text):
    # we make sure that the archive name can be used as directory name (for borg mount)
    MAX_PATH = 260  # Windows default. Since Win10, there is a registry setting LongPathsEnabled to get more.
    MAX_DIRNAME = MAX_PATH - len("12345678.123")
    SAFETY_MARGIN = 48  # borgfs path: mountpoint / archivename / dir / dir / ... / file
    MAX_ARCHIVENAME = MAX_DIRNAME - SAFETY_MARGIN
    invalid_ctrl_chars = "".join(chr(i) for i in range(32))
    # note: ":" is also an invalid path char on windows, but we can not blacklist it,
    # because e.g. our {now} placeholder creates ISO-8601 like output like 2022-12-10T20:47:42 .
    invalid_chars = r"/" + r"\"<|>?*"  # posix + windows
    validate_text = text_validator(
        name="archive name",
        min_length=1,
        max_length=MAX_ARCHIVENAME,
        invalid_ctrl_chars=invalid_ctrl_chars,
        invalid_chars=invalid_chars,
        no_blanks=True,
    )
    return validate_text(text)


class BaseFormatter(metaclass=abc.ABCMeta):
    format: str
    static_data: dict[str, Any]
    FIXED_KEYS: ClassVar[dict[str, str]] = {
        # Formatting aids
        "LF": "\n",
        "SPACE": " ",
        "TAB": "\t",
        "CR": "\r",
        "NUL": "\0",
        "NEWLINE": "\n",
        "NL": "\n",  # \n is automatically converted to os.linesep on write
    }
    KEY_DESCRIPTIONS: ClassVar[dict[str, str]] = {
        "NEWLINE": "OS dependent line separator",
        "NL": "alias of NEWLINE",
        "NUL": "NUL character for creating print0 / xargs -0 like output",
        "SPACE": "space character",
        "TAB": "tab character",
        "CR": "carriage return character",
        "LF": "line feed character",
    }
    KEY_GROUPS: ClassVar[tuple[tuple[str, ...], ...]] = (("NEWLINE", "NL", "NUL", "SPACE", "TAB", "CR", "LF"),)

    def __init__(self, format: str, static: dict[str, Any]) -> None:
        self.format = partial_format(format, static)
        self.static_data = static

    @abc.abstractmethod
    def get_item_data(self, item, jsonline=False) -> dict:
        raise NotImplementedError

    def format_item(self, item, jsonline=False, sort=False):
        data = self.get_item_data(item, jsonline)
        return (
            f"{json.dumps(data, cls=BorgJsonEncoder, sort_keys=sort)}\n" if jsonline else self.format.format_map(data)
        )

    @classmethod
    def keys_help(cls):
        help = []
        keys: set[str] = set()
        keys.update(cls.KEY_DESCRIPTIONS.keys())
        keys.update(key for group in cls.KEY_GROUPS for key in group)

        for group in cls.KEY_GROUPS:
            for key in group:
                keys.remove(key)
                text = "- " + key
                if key in cls.KEY_DESCRIPTIONS:
                    text += ": " + cls.KEY_DESCRIPTIONS[key]
                help.append(text)
            help.append("")
        assert not keys, str(keys)
        return "\n".join(help)


class ArchiveFormatter(BaseFormatter):
    KEY_DESCRIPTIONS = {
        "archive": "archive name",
        "name": 'alias of "archive"',
        "comment": "archive comment",
        "tags": "archive tags",
        # *start* is the key used by borg-info for this timestamp, this makes the formats more compatible
        "start": "time (start) of creation of the archive",
        "time": 'alias of "start"',
        "end": "time (end) of creation of the archive",
        "command_line": "command line which was used to create the archive",
        "id": "internal ID of the archive",
        "hostname": "hostname of host on which this archive was created",
        "username": "username of user who created this archive",
        "size": "size of this archive (data plus metadata, not considering compression and deduplication)",
        "nfiles": "count of files in this archive",
    }
    KEY_GROUPS = (
        ("archive", "name", "comment", "id", "tags"),
        ("start", "time", "end", "command_line"),
        ("hostname", "username"),
        ("size", "nfiles"),
    )

    def __init__(self, format, repository, manifest, key, *, iec=False, deleted=False):
        static_data = {} | self.FIXED_KEYS  # here could be stuff on repo level, above archive level
        super().__init__(format, static_data)
        self.repository = repository
        self.manifest = manifest
        self.key = key
        self.name = None
        self.id = None
        self._archive = None
        self.deleted = deleted  # True if we want to deal with deleted archives.
        self.iec = iec
        self.format_keys = {f[1] for f in Formatter().parse(format)}
        self.call_keys = {
            "hostname": partial(self.get_meta, "hostname", ""),
            "username": partial(self.get_meta, "username", ""),
            "comment": partial(self.get_meta, "comment", ""),
            "command_line": partial(self.get_meta, "command_line", ""),
            "size": partial(self.get_meta, "size", 0),
            "nfiles": partial(self.get_meta, "nfiles", 0),
            "end": self.get_ts_end,
            "tags": self.get_tags,
        }
        self.used_call_keys = set(self.call_keys) & self.format_keys

    def get_item_data(self, archive_info, jsonline=False):
        self.name = archive_info.name
        self.id = archive_info.id
        item_data = {}
        item_data |= {} if jsonline else self.static_data
        item_data |= {
            "name": archive_info.name,
            "archive": archive_info.name,
            "id": bin_to_hex(archive_info.id),
            "time": self.format_time(archive_info.ts),
            "start": self.format_time(archive_info.ts),
        }
        for key in self.used_call_keys:
            item_data[key] = self.call_keys[key]()

        # Note: name and comment are validated, should never contain surrogate escapes.
        # But unsure whether hostname, username, command_line could contain surrogate escapes, play safe:
        for key in "hostname", "username", "command_line":
            if key in item_data:
                item_data |= text_to_json(key, item_data[key])
        return item_data

    @property
    def archive(self):
        """lazy load / update loaded archive"""
        if self._archive is None or self._archive.id != self.id:
            from ..archive import Archive

            self._archive = Archive(self.manifest, self.id, iec=self.iec, deleted=self.deleted)
        return self._archive

    def get_meta(self, key, default=None):
        return self.archive.metadata.get(key, default)

    def get_ts_end(self):
        return self.format_time(self.archive.ts_end)

    def format_time(self, ts):
        return OutputTimestamp(ts)

    def get_tags(self):
        return ",".join(sorted(self.archive.tags))


class ItemFormatter(BaseFormatter):
    # we provide the hash algos from python stdlib (except shake_*) and additionally xxh64.
    # shake_* is not provided because it uses an incompatible .digest() method to support variable length.
    hash_algorithms = set(hashlib.algorithms_guaranteed).union({"xxh64"}).difference({"shake_128", "shake_256"})
    KEY_DESCRIPTIONS = {
        "type": "file type (file, dir, symlink, ...)",
        "mode": "file mode (as in stat)",
        "uid": "user id of file owner",
        "gid": "group id of file owner",
        "user": "user name of file owner",
        "group": "group name of file owner",
        "path": "file path",
        "target": "link target for symlinks",
        "hlid": "hard link identity (same if hardlinking same fs object)",
        "inode": "inode number",
        "flags": "file flags",
        "extra": 'prepends {target} with " -> " for soft links and " link to " for hard links',
        "size": "file size",
        "num_chunks": "number of chunks in this file",
        "mtime": "file modification time",
        "ctime": "file change time",
        "atime": "file access time",
        "isomtime": "file modification time (ISO 8601 format)",
        "isoctime": "file change time (ISO 8601 format)",
        "isoatime": "file access time (ISO 8601 format)",
        "xxh64": "XXH64 checksum of this file (note: this is NOT a cryptographic hash!)",
        "archiveid": "internal ID of the archive",
        "archivename": "name of the archive",
    }
    KEY_GROUPS = (
        ("type", "mode", "uid", "gid", "user", "group", "path", "target", "hlid", "inode", "flags"),
        ("size", "num_chunks"),
        ("mtime", "ctime", "atime", "isomtime", "isoctime", "isoatime"),
        tuple(sorted(hash_algorithms)),
        ("archiveid", "archivename", "extra"),
    )

    KEYS_REQUIRING_CACHE = ()

    @classmethod
    def format_needs_cache(cls, format):
        format_keys = {f[1] for f in Formatter().parse(format)}
        return any(key in cls.KEYS_REQUIRING_CACHE for key in format_keys)

    def __init__(self, archive, format):
        from ..checksums import StreamingXXH64

        static_data = {"archivename": archive.name, "archiveid": archive.fpr} | self.FIXED_KEYS
        super().__init__(format, static_data)
        self.xxh64 = StreamingXXH64
        self.archive = archive
        # track which keys were requested in the format string
        self.format_keys = {f[1] for f in Formatter().parse(format)}
        self.call_keys = {
            "size": self.calculate_size,
            "num_chunks": self.calculate_num_chunks,
            "isomtime": partial(self.format_iso_time, "mtime"),
            "isoctime": partial(self.format_iso_time, "ctime"),
            "isoatime": partial(self.format_iso_time, "atime"),
            "mtime": partial(self.format_time, "mtime"),
            "ctime": partial(self.format_time, "ctime"),
            "atime": partial(self.format_time, "atime"),
        }
        for hash_function in self.hash_algorithms:
            self.call_keys[hash_function] = partial(self.hash_item, hash_function)
        self.used_call_keys = set(self.call_keys) & self.format_keys

    def get_item_data(self, item, jsonline=False):
        item_data = {}
        item_data |= {} if jsonline else self.static_data

        item_data |= text_to_json("path", item.path)
        target = item.get("target", "")
        item_data |= text_to_json("target", target)
        if not jsonline:
            item_data["extra"] = "" if not target else f" -> {item_data['target']}"

        hlid = item.get("hlid")
        hlid = bin_to_hex(hlid) if hlid else ""
        item_data["hlid"] = hlid

        mode = stat.filemode(item.mode)
        item_type = mode[0]
        item_data["type"] = item_type
        item_data["mode"] = mode

        item_data["uid"] = item.get("uid")  # int or None
        item_data["gid"] = item.get("gid")  # int or None
        item_data |= text_to_json("user", item.get("user", str(item_data["uid"])))
        item_data |= text_to_json("group", item.get("group", str(item_data["gid"])))

        item_data["flags"] = item.get("bsdflags")  # int if flags known, else (if flags unknown) None
        # inode number from source filesystem (may be absent on some platforms)
        item_data["inode"] = item.get("inode")
        for key in self.used_call_keys:
            item_data[key] = self.call_keys[key](item)
        # When producing JSON lines, include selected static archive-level keys if they were
        # requested via --format. This mirrors text output behavior and fixes #9095.
        if jsonline:
            # Include selected static archive-level keys when requested via --format.
            # Keep implementation style aligned with 1.4-maint.
            for k in ("archivename", "archiveid"):
                if k in self.format_keys:
                    item_data[k] = self.static_data[k]
        return item_data

    def calculate_num_chunks(self, item):
        return len(item.get("chunks", []))

    def calculate_size(self, item):
        # note: does not support hard link slaves, they will be size 0
        return item.get_size()

    def hash_item(self, hash_function, item):
        if "chunks" not in item:
            return ""
        if hash_function == "xxh64":
            hash = self.xxh64()
        elif hash_function in self.hash_algorithms:
            hash = hashlib.new(hash_function)
        for data in self.archive.pipeline.fetch_many(item.chunks, ro_type=ROBJ_FILE_STREAM):
            hash.update(data)
        return hash.hexdigest()

    def format_time(self, key, item):
        return OutputTimestamp(safe_timestamp(item.get(key) or item.mtime))

    def format_iso_time(self, key, item):
        return self.format_time(key, item).isoformat()


class DiffFormatter(BaseFormatter):
    KEY_DESCRIPTIONS = {
        "path": "archived file path",
        "change": "all available changes",
        "content": "file content change",
        "mode": "file mode change",
        "type": "file type change",
        "owner": "file owner (user/group) change",
        "user": "file user change",
        "group": "file group change",
        "link": "file link change",
        "directory": "file directory change",
        "blkdev": "file block device change",
        "chrdev": "file character device change",
        "fifo": "file fifo change",
        "mtime": "file modification time change",
        "ctime": "file change time change",
        "isomtime": "file modification time change (ISO 8601)",
        "isoctime": "file creation time change (ISO 8601)",
    }
    KEY_GROUPS = (
        ("path", "change"),
        ("content", "mode", "type", "owner", "group", "user"),
        ("link", "directory", "blkdev", "chrdev", "fifo"),
        ("mtime", "ctime", "isomtime", "isoctime"),
    )
    METADATA = ("mode", "type", "owner", "group", "user", "mtime", "ctime")

    def __init__(self, format, content_only=False):
        static_data = {} | self.FIXED_KEYS
        super().__init__(format or "{content}{link}{directory}{blkdev}{chrdev}{fifo} {path}{NL}", static_data)
        self.content_only = content_only
        self.format_keys = {f[1] for f in Formatter().parse(format)}
        self.call_keys = {
            "content": self.format_content,
            "mode": self.format_mode,
            "type": partial(self.format_mode, filetype=True),
            "owner": partial(self.format_owner),
            "group": partial(self.format_owner, spec="group"),
            "user": partial(self.format_owner, spec="user"),
            "link": partial(self.format_other, "link"),
            "directory": partial(self.format_other, "directory"),
            "blkdev": partial(self.format_other, "blkdev"),
            "chrdev": partial(self.format_other, "chrdev"),
            "fifo": partial(self.format_other, "fifo"),
            "mtime": partial(self.format_time, "mtime"),
            "ctime": partial(self.format_time, "ctime"),
            "isomtime": partial(self.format_iso_time, "mtime"),
            "isoctime": partial(self.format_iso_time, "ctime"),
        }
        self.used_call_keys = set(self.call_keys) & self.format_keys
        if self.content_only:
            self.used_call_keys -= set(self.METADATA)

    def get_item_data(self, item: "ItemDiff", jsonline=False) -> dict:
        diff_data = {}
        for key in self.used_call_keys:
            diff_data[key] = self.call_keys[key](item)

        change = []
        for key in self.call_keys:
            if key in ("isomtime", "isoctime"):
                continue
            if self.content_only and key in self.METADATA:
                continue
            change.append(self.call_keys[key](item))
        diff_data["change"] = " ".join([v for v in change if v])
        diff_data["path"] = item.path
        diff_data |= {} if jsonline else self.static_data
        return diff_data

    def format_other(self, key, diff: "ItemDiff"):
        change = diff.changes().get(key)
        return f"{change.diff_type}".ljust(27) if change else ""  # 27 is the length of the content change

    def format_mode(self, diff: "ItemDiff", filetype=False):
        change = diff.type() if filetype else diff.mode()
        return f"[{change.diff_data['item1']} -> {change.diff_data['item2']}]" if change else ""

    def format_owner(self, diff: "ItemDiff", spec: Literal["owner", "user", "group"] = "owner"):
        if spec == "user":
            change = diff.user()
            return f"[{change.diff_data['item1']} -> {change.diff_data['item2']}]" if change else ""
        if spec == "group":
            change = diff.group()
            return f"[{change.diff_data['item1']} -> {change.diff_data['item2']}]" if change else ""
        if spec != "owner":
            raise ValueError(f"Invalid owner spec: {spec}")
        change = diff.owner()
        if change:
            return "[{}:{} -> {}:{}]".format(
                change.diff_data["item1"][0],
                change.diff_data["item1"][1],
                change.diff_data["item2"][0],
                change.diff_data["item2"][1],
            )
        return ""

    def format_content(self, diff: "ItemDiff"):
        change = diff.content()
        if change:
            if change.diff_type == "added":
                return "{}: {:>20}".format(change.diff_type, format_file_size(change.diff_data["added"]))
            if change.diff_type == "removed":
                return "{}: {:>18}".format(change.diff_type, format_file_size(change.diff_data["removed"]))
            if "added" not in change.diff_data and "removed" not in change.diff_data:
                return "modified:  (can't get size)"
            return "{}: {:>8} {:>8}".format(
                change.diff_type,
                format_file_size(change.diff_data["added"], precision=1, sign=True),
                format_file_size(-change.diff_data["removed"], precision=1, sign=True),
            )
        return ""

    def format_time(self, key, diff: "ItemDiff"):
        change = diff.changes().get(key)
        return f"[{key}: {change.diff_data['item1']} -> {change.diff_data['item2']}]" if change else ""

    def format_iso_time(self, key, diff: "ItemDiff"):
        change = diff.changes().get(key)
        return (
            f"[{key}: {change.diff_data['item1'].isoformat()} -> {change.diff_data['item2'].isoformat()}]"
            if change
            else ""
        )


def file_status(mode):
    if stat.S_ISREG(mode):
        return "A"
    elif stat.S_ISDIR(mode):
        return "d"
    elif stat.S_ISBLK(mode):
        return "b"
    elif stat.S_ISCHR(mode):
        return "c"
    elif stat.S_ISLNK(mode):
        return "s"
    elif stat.S_ISFIFO(mode):
        return "f"
    return "?"


def clean_lines(lines, lstrip=None, rstrip=None, remove_empty=True, remove_comments=True):
    """
    clean lines (usually read from a config file):

    1. strip whitespace (left and right), 2. remove empty lines, 3. remove comments.

    note: only "pure comment lines" are supported, no support for "trailing comments".

    :param lines: input line iterator (e.g. list or open text file) that gives unclean input lines
    :param lstrip: lstrip call arguments or False, if lstripping is not desired
    :param rstrip: rstrip call arguments or False, if rstripping is not desired
    :param remove_comments: remove comment lines (lines starting with "#")
    :param remove_empty: remove empty lines
    :return: yields processed lines
    """
    for line in lines:
        if lstrip is not False:
            line = line.lstrip(lstrip)
        if rstrip is not False:
            line = line.rstrip(rstrip)
        if remove_empty and not line:
            continue
        if remove_comments and line.startswith("#"):
            continue
        yield line


def swidth_slice(string, max_width):
    """
    Return a slice of *max_width* cells from *string*.

    Negative *max_width* means from the end of string.

    *max_width* is in units of character cells (or "columns").
    Latin characters are usually one cell wide, many CJK characters are two cells wide.
    """
    from ..platform import swidth

    reverse = max_width < 0
    max_width = abs(max_width)
    if reverse:
        string = reversed(string)
    current_swidth = 0
    result = []
    for character in string:
        current_swidth += swidth(character)
        if current_swidth > max_width:
            break
        result.append(character)
    if reverse:
        result.reverse()
    return "".join(result)


def ellipsis_truncate(msg, space):
    """
    shorten a long string by adding ellipsis between it and return it, example:
    this_is_a_very_long_string -------> this_is..._string
    """
    from ..platform import swidth

    ellipsis_width = swidth("...")
    msg_width = swidth(msg)
    if space < 8:
        # if there is very little space, just show ...
        return "..." + " " * (space - ellipsis_width)
    if space < ellipsis_width + msg_width:
        return f"{swidth_slice(msg, space // 2 - ellipsis_width)}...{swidth_slice(msg, -space // 2)}"
    return msg + " " * (space - msg_width)


class BorgJsonEncoder(json.JSONEncoder):
    def default(self, o):
        from ..legacyrepository import LegacyRepository
        from ..repository import Repository
        from ..legacyremote import LegacyRemoteRepository
        from ..remote import RemoteRepository
        from ..archive import Archive
        from ..cache import AdHocWithFilesCache

        if isinstance(o, (LegacyRepository, LegacyRemoteRepository)) or isinstance(o, (Repository, RemoteRepository)):
            return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()}
        if isinstance(o, Archive):
            return o.info()
        if isinstance(o, (AdHocWithFilesCache,)):
            return {"path": o.path}
        if isinstance(o, Path):
            return str(o)
        if callable(getattr(o, "to_json", None)):
            return o.to_json()
        return super().default(o)


def basic_json_data(manifest, *, cache=None, extra=None):
    key = manifest.key
    data = extra or {}
    data |= {"repository": BorgJsonEncoder().default(manifest.repository), "encryption": {"mode": key.ARG_NAME}}
    data["repository"]["last_modified"] = OutputTimestamp(manifest.last_timestamp)
    if key.NAME.startswith("key file"):
        data["encryption"]["keyfile"] = key.find_key()
    if cache:
        data["cache"] = cache
    return data


def json_dump(obj):
    """Dump using BorgJSONEncoder."""
    return json.dumps(obj, sort_keys=True, indent=4, cls=BorgJsonEncoder)


def json_print(obj):
    print(json_dump(obj))


def prepare_dump_dict(d):
    def decode_bytes(value):
        # this should somehow be reversible later, but usual strings should
        # look nice and chunk ids should mostly show in hex. Use a special
        # inband signaling character (ASCII DEL) to distinguish between
        # decoded and hex mode.
        if not value.startswith(b"\x7f"):
            try:
                value = value.decode()
                return value
            except UnicodeDecodeError:
                pass
        return "\u007f" + bin_to_hex(value)

    def decode_tuple(t):
        res = []
        for value in t:
            if isinstance(value, dict):
                value = decode(value)
            elif isinstance(value, tuple) or isinstance(value, list):
                value = decode_tuple(value)
            elif isinstance(value, bytes):
                value = decode_bytes(value)
            res.append(value)
        return res

    def decode(d):
        res = OrderedDict()
        for key, value in d.items():
            if isinstance(value, dict):
                value = decode(value)
            elif isinstance(value, (tuple, list)):
                value = decode_tuple(value)
            elif isinstance(value, bytes):
                value = decode_bytes(value)
            elif isinstance(value, Timestamp):
                value = value.to_unix_nano()
            if isinstance(key, bytes):
                key = key.decode()
            res[key] = value
        return res

    return decode(d)


class Highlander(argparse.Action):
    """make sure some option is only given once"""

    def __init__(self, *args, **kwargs):
        self.__called = False
        super().__init__(*args, **kwargs)

    def __call__(self, parser, namespace, values, option_string=None):
        if self.__called:
            raise argparse.ArgumentError(self, "There can be only one.")
        self.__called = True
        setattr(namespace, self.dest, values)


class MakePathSafeAction(Highlander):
    def __call__(self, parser, namespace, path, option_string=None):
        try:
            sanitized_path = make_path_safe(path)
        except ValueError as e:
            raise argparse.ArgumentError(self, e)
        if sanitized_path == ".":
            raise argparse.ArgumentError(self, f"{path!r} is not a valid file name")
        setattr(namespace, self.dest, sanitized_path)
