1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
|
import time
import random
import logging
import threading
from plumbum.commands import BaseCommand, run_proc
from plumbum.lib import six
class ShellSessionError(Exception):
"""Raises when something goes wrong when calling
:func:`ShellSession.popen <plumbum.session.ShellSession.popen>`"""
pass
shell_logger = logging.getLogger("plumbum.shell")
#===================================================================================================
# Shell Session Popen
#===================================================================================================
class MarkedPipe(object):
"""A pipe-like object from which you can read lines; the pipe will return report EOF (the
empty string) when a special marker is detected"""
__slots__ = ["pipe", "marker"]
def __init__(self, pipe, marker):
self.pipe = pipe
self.marker = marker
if six.PY3:
self.marker = six.bytes(self.marker, "ascii")
def close(self):
"""'Closes' the marked pipe; following calls to ``readline`` will return """""
# consume everything
while self.readline():
pass
self.pipe = None
def readline(self):
"""Reads the next line from the pipe; returns "" when the special marker is reached.
Raises ``EOFError`` if the underlying pipe has closed"""
if self.pipe is None:
return six.b("")
line = self.pipe.readline()
if not line:
raise EOFError()
if line.strip() == self.marker:
self.pipe = None
line = six.b("")
return line
class SessionPopen(object):
"""A shell-session-based ``Popen``-like object (has the following attributes: ``stdin``,
``stdout``, ``stderr``, ``returncode``)"""
def __init__(self, argv, isatty, stdin, stdout, stderr, encoding):
self.argv = argv
self.isatty = isatty
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
self.encoding = encoding
self.returncode = None
self._done = False
def poll(self):
"""Returns the process' exit code or ``None`` if it's still running"""
if self._done:
return self.returncode
else:
return None
def wait(self):
"""Waits for the process to terminate and returns its exit code"""
self.communicate()
return self.returncode
def communicate(self, input = None):
"""Consumes the process' stdout and stderr until the it terminates.
:param input: An optional bytes/buffer object to send to the process over stdin
:returns: A tuple of (stdout, stderr)
"""
stdout = []
stderr = []
sources = [("1", stdout, self.stdout)]
if not self.isatty:
# in tty mode, stdout and stderr are unified
sources.append(("2", stderr, self.stderr))
i = 0
while sources:
if input:
chunk = input[:1000]
self.stdin.write(chunk)
self.stdin.flush()
input = input[1000:]
i = (i + 1) % len(sources)
name, coll, pipe = sources[i]
line = pipe.readline()
shell_logger.debug("%s> %r", name, line)
if not line:
del sources[i]
else:
coll.append(line)
if self.isatty:
stdout.pop(0) # discard first line of prompt
try:
self.returncode = int(stdout.pop(-1))
except (IndexError, ValueError):
self.returncode = "Unknown"
self._done = True
stdout = six.b("").join(stdout)
stderr = six.b("").join(stderr)
return stdout, stderr
class ShellSession(object):
"""An abstraction layer over *shell sessions*. A shell session is the execution of an
interactive shell (``/bin/sh`` or something compatible), over which you may run commands
(sent over stdin). The output of is then read from stdout and stderr. Shell sessions are
less "robust" than executing a process on its own, and they are susseptible to all sorts
of malformatted-strings attacks, and there is little benefit from using them locally.
However, they can greatly speed up remote connections, and are required for the implementation
of :class:`SshMachine <plumbum.machines.remote.SshMachine>`, as they allow us to send multiple
commands over a single SSH connection (setting up separate SSH connections incurs a high
overhead). Try to avoid using shell sessions, unless you know what you're doing.
Instances of this class may be used as *context-managers*.
:param proc: The underlying shell process (with open stdin, stdout and stderr)
:param encoding: The encoding to use for the shell session. If ``"auto"``, the underlying
process' encoding is used.
:param isatty: If true, assume the shell has a TTY and that stdout and stderr are unified
:param connect_timeout: The timeout to connect to the shell, after which, if no prompt
is seen, the shell process is killed
"""
def __init__(self, proc, encoding = "auto", isatty = False, connect_timeout = 5):
self.proc = proc
self.encoding = proc.encoding if encoding == "auto" else encoding
self.isatty = isatty
self._current = None
if connect_timeout:
def closer():
shell_logger.error("Connection to %s timed out (%d sec)", proc, connect_timeout)
self.close()
timer = threading.Timer(connect_timeout, self.close)
timer.start()
self.run("")
if connect_timeout:
timer.cancel()
def __enter__(self):
return self
def __exit__(self, t, v, tb):
self.close()
def __del__(self):
try:
self.close()
except Exception:
pass
def alive(self):
"""Returns ``True`` if the underlying shell process is alive, ``False`` otherwise"""
return self.proc and self.proc.poll() is None
def close(self):
"""Closes (terminates) the shell session"""
if not self.alive():
return
try:
self.proc.stdin.write(six.b("\nexit\n\n\nexit\n\n"))
self.proc.stdin.flush()
time.sleep(0.05)
except (ValueError, EnvironmentError):
pass
for p in [self.proc.stdin, self.proc.stdout, self.proc.stderr]:
try:
p.close()
except Exception:
pass
try:
self.proc.kill()
except EnvironmentError:
pass
self.proc = None
def popen(self, cmd):
"""Runs the given command in the shell, adding some decoration around it. Only a single
command can be executed at any given time.
:param cmd: The command (string or :class:`Command <plumbum.commands.BaseCommand>` object)
to run
:returns: A :class:`SessionPopen <plumbum.session.SessionPopen>` instance
"""
if self.proc is None:
raise ShellSessionError("Shell session has already been closed")
if self._current and not self._current._done:
raise ShellSessionError("Each shell may start only one process at a time")
if isinstance(cmd, BaseCommand):
full_cmd = cmd.formulate(1)
else:
full_cmd = cmd
marker = "--.END%s.--" % (time.time() * random.random(),)
if full_cmd.strip():
full_cmd += " ; "
else:
full_cmd = "true ; "
full_cmd += "echo $? ; echo '%s'" % (marker,)
if not self.isatty:
full_cmd += " ; echo '%s' 1>&2" % (marker,)
if self.encoding:
full_cmd = full_cmd.encode(self.encoding)
shell_logger.debug("Running %r", full_cmd)
self.proc.stdin.write(full_cmd + six.b("\n"))
self.proc.stdin.flush()
self._current = SessionPopen(full_cmd, self.isatty, self.proc.stdin,
MarkedPipe(self.proc.stdout, marker), MarkedPipe(self.proc.stderr, marker),
self.encoding)
return self._current
def run(self, cmd, retcode = 0):
"""Runs the given command
:param cmd: The command (string or :class:`Command <plumbum.commands.BaseCommand>` object)
to run
:param retcode: The expected return code (0 by default). Set to ``None`` in order to
ignore erroneous return codes
:returns: A tuple of (return code, stdout, stderr)
"""
return run_proc(self.popen(cmd), retcode)
|