File: execute.py

package info (click to toggle)
python-graphviz 0.20.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,188 kB
  • sloc: python: 4,098; makefile: 13
file content (132 lines) | stat: -rw-r--r-- 4,458 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""Run subprocesses with ``subprocess.run()`` and ``subprocess.Popen()``."""

import errno
import logging
import os
import subprocess
import sys
import typing

from .. import _compat

__all__ = ['run_check', 'ExecutableNotFound', 'CalledProcessError']


log = logging.getLogger(__name__)


BytesOrStrIterator = typing.Union[typing.Iterator[bytes],
                                  typing.Iterator[str]]


@typing.overload
def run_check(cmd: typing.Sequence[typing.Union[os.PathLike, str]], *,
              input_lines: typing.Optional[typing.Iterator[bytes]] = ...,
              encoding: None = ...,
              quiet: bool = ...,
              **kwargs) -> subprocess.CompletedProcess:
    """Accept bytes input_lines with default ``encoding=None```."""


@typing.overload
def run_check(cmd: typing.Sequence[typing.Union[os.PathLike, str]], *,
              input_lines: typing.Optional[typing.Iterator[str]] = ...,
              encoding: str,
              quiet: bool = ...,
              **kwargs) -> subprocess.CompletedProcess:
    """Accept string input_lines when given ``encoding``."""


@typing.overload
def run_check(cmd: typing.Sequence[typing.Union[os.PathLike, str]], *,
              input_lines: typing.Optional[BytesOrStrIterator] = ...,
              encoding: typing.Optional[str] = ...,
              capture_output: bool = ...,
              quiet: bool = ...,
              **kwargs) -> subprocess.CompletedProcess:
    """Accept bytes or string input_lines depending on ``encoding``."""


def run_check(cmd: typing.Sequence[typing.Union[os.PathLike, str]], *,
              input_lines: typing.Optional[BytesOrStrIterator] = None,
              encoding: typing.Optional[str] = None,
              quiet: bool = False,
              **kwargs) -> subprocess.CompletedProcess:
    """Run the command described by ``cmd``
        with ``check=True`` and return its completed process.

    Raises:
        CalledProcessError: if the returncode of the subprocess is non-zero.
    """
    log.debug('run %r', cmd)
    if not kwargs.pop('check', True):  # pragma: no cover
        raise NotImplementedError('check must be True or omited')

    if encoding is not None:
        kwargs['encoding'] = encoding

    kwargs.setdefault('startupinfo', _compat.get_startupinfo())

    try:
        if input_lines is not None:
            assert kwargs.get('input') is None
            assert iter(input_lines) is input_lines
            if kwargs.pop('capture_output'):
                kwargs['stdout'] = kwargs['stderr'] = subprocess.PIPE
            proc = _run_input_lines(cmd, input_lines, kwargs=kwargs)
        else:
            proc = subprocess.run(cmd, **kwargs)
    except OSError as e:
        if e.errno == errno.ENOENT:
            raise ExecutableNotFound(cmd) from e
        raise

    if not quiet and proc.stderr:
        _write_stderr(proc.stderr)

    try:
        proc.check_returncode()
    except subprocess.CalledProcessError as e:
        raise CalledProcessError(*e.args)

    return proc


def _run_input_lines(cmd, input_lines, *, kwargs):
    popen = subprocess.Popen(cmd, stdin=subprocess.PIPE, **kwargs)

    stdin_write = popen.stdin.write
    for line in input_lines:
        stdin_write(line)

    stdout, stderr = popen.communicate()
    return subprocess.CompletedProcess(popen.args, popen.returncode,
                                       stdout=stdout, stderr=stderr)


def _write_stderr(stderr) -> None:
    if isinstance(stderr, bytes):
        stderr_encoding = (getattr(sys.stderr, 'encoding', None)
                           or sys.getdefaultencoding())
        stderr = stderr.decode(stderr_encoding)

    sys.stderr.write(stderr)
    sys.stderr.flush()
    return None


class ExecutableNotFound(RuntimeError):
    """:exc:`RuntimeError` raised if the Graphviz executable is not found."""

    _msg = ('failed to execute {!r}, '
            'make sure the Graphviz executables are on your systems\' PATH')

    def __init__(self, args) -> None:
        super().__init__(self._msg.format(*args))


class CalledProcessError(subprocess.CalledProcessError):
    """:exc:`~subprocess.CalledProcessError` raised if a subprocess ``returncode`` is not ``0``."""  # noqa: E501

    def __str__(self) -> 'str':
        return f'{super().__str__()} [stderr: {self.stderr!r}]'