File: cmdline.py

package info (click to toggle)
libvpl-tools 1.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 7,640 kB
  • sloc: cpp: 107,613; python: 4,303; ansic: 3,202; sh: 159; lisp: 52; makefile: 13
file content (213 lines) | stat: -rw-r--r-- 6,117 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
############################################################################
# Copyright (C) Intel Corporation
#
# SPDX-License-Identifier: MIT
############################################################################
"""commandline like tools"""

import shutil
import os
import sys
from contextlib import contextmanager
import time
import subprocess  # nosec

VERBOSE = False
if 'VERBOSE' in os.environ:
    if os.environ['VERBOSE'] not in ['']:
        VERBOSE = True


def _resolve_path(path):
    """resolve a path name from either a string, or a list of sub-paths"""
    if isinstance(path, list):
        return os.path.join(*path)
    return path


def _escape_cmd_arg(arg):
    """quote/escape and argument for a command line call so that it can
    be safely used even if it has special charaters"""
    if ' ' in arg or '"' in arg:
        return '"' + arg.replace('"', '""') + '"'
    return arg


def log(message):
    """Log activity"""
    if VERBOSE:
        print(f">> {message}", file=sys.stderr)
        sys.stderr.flush()


@contextmanager
def pushd(dst):
    """change working directory"""
    cur_dir = os.getcwd()
    dest = os.path.join(cur_dir, _resolve_path(dst))
    os.chdir(dest)
    log(f'pushd {dest}')
    try:
        yield
    finally:
        log(f'popd -> {cur_dir}')
        os.chdir(cur_dir)


#pylint: disable=invalid-name
def rm(target):
    """delete a file or folder"""
    target = _resolve_path(target)
    if os.path.exists(target):
        # Delete sometimes fails if done immediately, timeout
        # is not great, but allows filesystem settings to stabilize.
        timeout = time.time() + 10
        while time.time() < timeout:
            try:
                if os.path.isfile(target):
                    log(f'rm {target}')
                    os.remove(target)
                    break
                if os.path.isdir(target):
                    log(f'rm -rf {target}')
                    shutil.rmtree(target)
                    break
            except PermissionError:
                time.sleep(1)


#pylint: disable=invalid-name
def md(target):
    """make a folder"""
    target = _resolve_path(target)
    if target and not os.path.exists(target):
        log(f'mkdir -p {target}')
        os.makedirs(target)


#pylint: disable=invalid-name
def cp(src, dest):
    """copy a file or folder"""
    src = _resolve_path(src)
    dest = _resolve_path(dest)
    if os.path.exists(src):
        rm(dest)
        md(os.path.dirname(src))
        if os.path.isfile(src):
            log(f'cp {src} {dest}')
            shutil.copyfile(src, dest)
        elif os.path.isdir(src):
            log(f'cp {src} {dest}')
            shutil.copytree(src, dest)
        else:
            raise RuntimeError("Cannot copy unknown file type")


def join_command(command):
    """Join a series or parameters into a command, escaping if needed"""
    return ' '.join([_escape_cmd_arg(argument) for argument in command])


def run_command(*args, no_throw=False, env=None):
    """Run a command"""
    if len(args) == 1:
        cmd = args[0]
    else:
        cmd = join_command(args)
    log(f'{cmd}')
    if os.name != 'nt':
        cmd = "exec bash -c '" + cmd + "'"
    with subprocess.Popen(cmd, shell=True, env=env) as proc:  # nosec
        proc.communicate()
        if not no_throw and proc.returncode != 0:
            raise RuntimeError("Error running command: " + cmd)
        return proc.returncode


def run_commands(*args, no_throw=False, env=None):
    """Run several commands"""
    commands = []
    for arg in args:
        if isinstance(arg, (str)):
            commands.append(arg)
        else:
            commands.append(join_command(arg))
    if os.name == 'nt':
        script_file = "temp.bat"
    else:
        script_file = "temp.sh"
    with open(script_file, "w", encoding="utf-8") as script:
        log('echo "')
        for cmd in commands:
            log(f'{cmd}')
            script.write(cmd + "\n")
        log(f'" > {script_file}')
    log('{script_file}')
    if os.name == 'nt':
        cmd = script_file
    else:
        cmd = "exec bash -c 'source " + script_file + "'"
    with subprocess.Popen(cmd, shell=True, env=env) as proc:  # nosec
        proc.communicate()
        rm(script_file)
        if not no_throw and proc.returncode != 0:
            raise RuntimeError("Error running: \n" + "\n".join(commands))
        return proc.returncode


def capture_command(*args, env=None):
    """Run a command and capture the output"""
    if len(args) == 1:
        cmd = args[0]
    else:
        cmd = join_command(args)
    log(f'{cmd}')
    if os.name != 'nt':
        cmd = "exec bash -c '" + cmd + "'"
    with subprocess.Popen(  # nosec
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            universal_newlines=True,
            shell=True,
            env=env) as proc:
        result = proc.communicate()
        return (result[0], result[1], proc.returncode)


def capture_commands(*args, env=None):
    """Run several commands and capture the output"""
    commands = []
    for arg in args:
        if not arg:
            continue
        if isinstance(arg, (str)):
            commands.append(arg)
        else:
            commands.append(join_command(arg))
    script_file = None
    if os.name == 'nt':
        script_file = "temp.bat"
    else:
        script_file = "temp.sh"
    with open(script_file, "w", encoding="utf-8") as script:
        log('echo "')
        for cmd in commands:
            log(f'{cmd}')
            script.write(cmd + "\n")
        log(f'" > {script_file}')
    log('{script_file}')
    if os.name == 'nt':
        cmd = script_file
    else:
        cmd = "exec bash -c 'source " + script_file + "'"
    with subprocess.Popen(  # nosec
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            universal_newlines=True,
            shell=True,
            env=env) as proc:
        result = proc.communicate()
        rm(script_file)
        return (result[0], result[1], proc.returncode)