File: session.py

package info (click to toggle)
python-plumbum 1.4.2-1
  • links: PTS
  • area: main
  • in suites: jessie-kfreebsd
  • size: 348 kB
  • sloc: python: 3,452; makefile: 5
file content (225 lines) | stat: -rw-r--r-- 8,748 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import time
import random
import logging
import threading
from plumbum.commands import BaseCommand, run_proc
from plumbum.lib import six


class ShellSessionError(Exception):
    """Raises when something goes wrong when calling
    :func:`ShellSession.popen <plumbum.session.ShellSession.popen>`"""
    pass

shell_logger = logging.getLogger("plumbum.shell")


#===================================================================================================
# Shell Session Popen
#===================================================================================================
class MarkedPipe(object):
    """A pipe-like object from which you can read lines; the pipe will return report EOF (the
    empty string) when a special marker is detected"""
    __slots__ = ["pipe", "marker"]
    def __init__(self, pipe, marker):
        self.pipe = pipe
        self.marker = marker
        if six.PY3:
            self.marker = six.bytes(self.marker, "ascii")
    def close(self):
        """'Closes' the marked pipe; following calls to ``readline`` will return """""
        # consume everything
        while self.readline():
            pass
        self.pipe = None
    def readline(self):
        """Reads the next line from the pipe; returns "" when the special marker is reached.
        Raises ``EOFError`` if the underlying pipe has closed"""
        if self.pipe is None:
            return six.b("")
        line = self.pipe.readline()
        if not line:
            raise EOFError()
        if line.strip() == self.marker:
            self.pipe = None
            line = six.b("")
        return line


class SessionPopen(object):
    """A shell-session-based ``Popen``-like object (has the following attributes: ``stdin``,
    ``stdout``, ``stderr``, ``returncode``)"""
    def __init__(self, argv, isatty, stdin, stdout, stderr, encoding):
        self.argv = argv
        self.isatty = isatty
        self.stdin = stdin
        self.stdout = stdout
        self.stderr = stderr
        self.encoding = encoding
        self.returncode = None
        self._done = False
    def poll(self):
        """Returns the process' exit code or ``None`` if it's still running"""
        if self._done:
            return self.returncode
        else:
            return None
    def wait(self):
        """Waits for the process to terminate and returns its exit code"""
        self.communicate()
        return self.returncode
    def communicate(self, input = None):
        """Consumes the process' stdout and stderr until the it terminates.

        :param input: An optional bytes/buffer object to send to the process over stdin
        :returns: A tuple of (stdout, stderr)
        """
        stdout = []
        stderr = []
        sources = [("1", stdout, self.stdout)]
        if not self.isatty:
            # in tty mode, stdout and stderr are unified
            sources.append(("2", stderr, self.stderr))
        i = 0
        while sources:
            if input:
                chunk = input[:1000]
                self.stdin.write(chunk)
                self.stdin.flush()
                input = input[1000:]
            i = (i + 1) % len(sources)
            name, coll, pipe = sources[i]
            line = pipe.readline()
            shell_logger.debug("%s> %r", name, line)
            if not line:
                del sources[i]
            else:
                coll.append(line)
        if self.isatty:
            stdout.pop(0)  # discard first line of prompt
        try:
            self.returncode = int(stdout.pop(-1))
        except (IndexError, ValueError):
            self.returncode = "Unknown"
        self._done = True
        stdout = six.b("").join(stdout)
        stderr = six.b("").join(stderr)
        return stdout, stderr


class ShellSession(object):
    """An abstraction layer over *shell sessions*. A shell session is the execution of an
    interactive shell (``/bin/sh`` or something compatible), over which you may run commands
    (sent over stdin). The output of is then read from stdout and stderr. Shell sessions are
    less "robust" than executing a process on its own, and they are susseptible to all sorts
    of malformatted-strings attacks, and there is little benefit from using them locally.
    However, they can greatly speed up remote connections, and are required for the implementation
    of :class:`SshMachine <plumbum.machines.remote.SshMachine>`, as they allow us to send multiple
    commands over a single SSH connection (setting up separate SSH connections incurs a high
    overhead). Try to avoid using shell sessions, unless you know what you're doing.

    Instances of this class may be used as *context-managers*.

    :param proc: The underlying shell process (with open stdin, stdout and stderr)
    :param encoding: The encoding to use for the shell session. If ``"auto"``, the underlying
                     process' encoding is used.
    :param isatty: If true, assume the shell has a TTY and that stdout and stderr are unified
    :param connect_timeout: The timeout to connect to the shell, after which, if no prompt
                            is seen, the shell process is killed
    """
    def __init__(self, proc, encoding = "auto", isatty = False, connect_timeout = 5):
        self.proc = proc
        self.encoding = proc.encoding if encoding == "auto" else encoding
        self.isatty = isatty
        self._current = None
        if connect_timeout:
            def closer():
                shell_logger.error("Connection to %s timed out (%d sec)", proc, connect_timeout)
                self.close()
            timer = threading.Timer(connect_timeout, self.close)
            timer.start()
        self.run("")
        if connect_timeout:
            timer.cancel()

    def __enter__(self):
        return self
    def __exit__(self, t, v, tb):
        self.close()
    def __del__(self):
        try:
            self.close()
        except Exception:
            pass

    def alive(self):
        """Returns ``True`` if the underlying shell process is alive, ``False`` otherwise"""
        return self.proc and self.proc.poll() is None

    def close(self):
        """Closes (terminates) the shell session"""
        if not self.alive():
            return
        try:
            self.proc.stdin.write(six.b("\nexit\n\n\nexit\n\n"))
            self.proc.stdin.flush()
            time.sleep(0.05)
        except (ValueError, EnvironmentError):
            pass
        for p in [self.proc.stdin, self.proc.stdout, self.proc.stderr]:
            try:
                p.close()
            except Exception:
                pass
        try:
            self.proc.kill()
        except EnvironmentError:
            pass
        self.proc = None

    def popen(self, cmd):
        """Runs the given command in the shell, adding some decoration around it. Only a single
        command can be executed at any given time.

        :param cmd: The command (string or :class:`Command <plumbum.commands.BaseCommand>` object)
                    to run
        :returns: A :class:`SessionPopen <plumbum.session.SessionPopen>` instance
        """
        if self.proc is None:
            raise ShellSessionError("Shell session has already been closed")
        if self._current and not self._current._done:
            raise ShellSessionError("Each shell may start only one process at a time")

        if isinstance(cmd, BaseCommand):
            full_cmd = cmd.formulate(1)
        else:
            full_cmd = cmd
        marker = "--.END%s.--" % (time.time() * random.random(),)
        if full_cmd.strip():
            full_cmd += " ; "
        else:
            full_cmd = "true ; "
        full_cmd += "echo $? ; echo '%s'" % (marker,)
        if not self.isatty:
            full_cmd += " ; echo '%s' 1>&2" % (marker,)
        if self.encoding:
            full_cmd = full_cmd.encode(self.encoding)
        shell_logger.debug("Running %r", full_cmd)
        self.proc.stdin.write(full_cmd + six.b("\n"))
        self.proc.stdin.flush()
        self._current = SessionPopen(full_cmd, self.isatty, self.proc.stdin,
            MarkedPipe(self.proc.stdout, marker), MarkedPipe(self.proc.stderr, marker),
            self.encoding)
        return self._current

    def run(self, cmd, retcode = 0):
        """Runs the given command

        :param cmd: The command (string or :class:`Command <plumbum.commands.BaseCommand>` object)
                    to run
        :param retcode: The expected return code (0 by default). Set to ``None`` in order to
                        ignore erroneous return codes
        :returns: A tuple of (return code, stdout, stderr)
        """
        return run_proc(self.popen(cmd), retcode)