File: fs.py

package info (click to toggle)
zabbix-cli 3.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,860 kB
  • sloc: python: 18,557; makefile: 3
file content (137 lines) | stat: -rw-r--r-- 4,577 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""Filesystem utilities."""

from __future__ import annotations

import logging
import os
import re
import subprocess
import sys
from collections.abc import Generator
from contextlib import contextmanager
from pathlib import Path
from typing import Optional

from zabbix_cli.exceptions import ZabbixCLIError
from zabbix_cli.exceptions import ZabbixCLIFileError
from zabbix_cli.exceptions import ZabbixCLIFileNotFoundError
from zabbix_cli.output.console import print_path
from zabbix_cli.output.console import success

logger = logging.getLogger(__name__)


def read_file(file: Path) -> str:
    """Attempts to read the contents of a file."""
    if not file.exists():
        raise ZabbixCLIFileNotFoundError(f"File {file} does not exist")
    if not file.is_file():
        raise ZabbixCLIFileError(f"{file} is not a file")
    try:
        return file.read_text()
    except OSError as e:
        raise ZabbixCLIFileError(f"Unable to read file {file}") from e


def open_directory(
    directory: Path, command: Optional[str] = None, *, force: bool = False
) -> None:
    """Open directory in file explorer.

    Prints the path to the directory to stderr if no window server is detected.
    The path must be a directory, otherwise a ZabbixCLIError is raised.

    Args:
        directory (Path): The directory to open.
        command (str, optional): The command to use to open the directory. If `None`, the command is determined based on the platform.
        force (bool, optional): If `True`, open the directory even if no window server is detected. Defaults to `False`.
    """
    try:
        if not directory.exists():
            raise FileNotFoundError
        directory = directory.resolve(strict=True)
    except FileNotFoundError as e:
        raise ZabbixCLIError(f"Directory {directory} does not exist") from e
    except OSError as e:
        raise ZabbixCLIError(f"Unable to resolve symlinks for {directory}") from e
    if not directory.is_dir():
        raise ZabbixCLIError(f"{directory} is not a directory")

    spath = str(directory)
    if sys.platform == "win32":
        subprocess.run([command or "explorer", spath])
    elif sys.platform == "darwin":
        subprocess.run([command or "open", spath])
    else:  # Linux and Unix
        if not os.environ.get("DISPLAY") and not force:
            print_path(directory)
            return
        subprocess.run([command or "xdg-open", spath])
    success(f"Opened {directory}")


def mkdir_if_not_exists(path: Path) -> None:
    """Create a directory for a given path if it does not exist.

    Returns the path if it was created, otherwise None.
    """
    if path.exists():
        return
    try:
        path.mkdir(parents=True, exist_ok=True)
    except Exception as e:
        raise ZabbixCLIFileError(f"Failed to create directory {path}: {e}") from e
    else:
        logger.info("Created directory: %s", path)


def sanitize_filename(filename: str) -> str:
    """Make a filename safe(r) for use in filesystems.

    Very naive implementation that removes illegal characters.
    Does not check for reserved names or path length.
    """
    return re.sub(r"[^\w\-.]", "_", filename)


def make_executable(path: Path) -> None:
    """Make a file executable."""
    if sys.platform == "win32":
        logger.debug("Skipping making file %s executable on Windows", path)
        return

    if not path.exists():
        raise ZabbixCLIFileNotFoundError(
            f"File {path} does not exist. Unable to make it executable."
        )
    mode = path.stat().st_mode
    new_mode = mode | (mode & 0o444) >> 2  # copy R bits to X
    if new_mode != mode:
        path.chmod(new_mode)
        logger.info("Changed file mode of %s from %o to %o", path, mode, new_mode)
    else:
        logger.debug("File %s is already executable", path)


def move_file(src: Path, dest: Path, *, mkdir: bool = True) -> None:
    """Move a file to a new location."""
    try:
        if mkdir:
            mkdir_if_not_exists(dest.parent)
        src.rename(dest)
    except Exception as e:
        raise ZabbixCLIError(f"Failed to move {src} to {dest}: {e}") from e
    else:
        logger.info("Moved %s to %s", src, dest)


@contextmanager
def temp_directory() -> Generator[Path, None, None]:
    """Context manager for creating a temporary directory.

    Ripped from: https://github.com/pypa/hatch/blob/35f8ffdacc937bdcf3b250e0be1bbdf5cde30c4c/src/hatch/utils/fs.py#L112-L117
    """
    from tempfile import TemporaryDirectory

    with TemporaryDirectory() as d:
        yield Path(d).resolve()