1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
|
# Stuff to talk to virsh, for libreswan
#
# Copyright (C) 2015-2019 Andrew Cagney <cagney@gnu.org>
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version. See <https://www.gnu.org/licenses/gpl2.txt>.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
import os
import pexpect
import re
from fab import logutil
from fab import timing
TIMEOUT = 10
# Names for each of the groups in the above (used below).
#
# Note that these are STRINGS and not BYTES. Even though any names in
# re.match(b'...') are bytes, the regex code indexes the group
# dictionary with the names converted to strings.
USERNAME_GROUP = "username"
HOSTNAME_GROUP = "hostname"
BASENAME_GROUP = "basename"
STATUS_GROUP = "status"
DOLLAR_GROUP = "dollar"
# XXX:
#
# There's this new fangled thing called "bracketed paste mode"
# which throws magic escape characters into the output stream.
# The below will match them letting KVMSH login. This doesn't do
# anything for *.console.verbose.txt that will likely also be full
# of it.
#
# Can't anchor this pattern to the end-of-buffer using $ as other
# random output may appear.
#
# Construct the regex as a STRING, and then convert it to BYTEs.
# The regex code indexes the group dictionary using the string
# name so this is hopefully easier?
_PROMPT_PATTERN = (r'(|\x1b\[\?2004h)' + # bracketed paste mode prefix!
r'(' +
( # HOSTNAME#
r'[-a-z0-9]+'
) +
r'|' +
( # [USER@HOST DIRECTORY [STATUS]]#
r'\[' +
r'(?P<' + USERNAME_GROUP + r'>[-.a-z0-9]+)' +
r'@' +
r'(?P<' + HOSTNAME_GROUP + r'>[-a-z0-9]+)' +
r' ' +
r'(?P<' + BASENAME_GROUP + r'>[-+=:,\.a-z0-9A-Z_~]+)' +
r'(| (?P<' + STATUS_GROUP + r'>[0-9]+))' +
r'\]'
) +
r')' +
r'(?P<' + DOLLAR_GROUP + r'>[#\$])' +
r' ').encode()
_PROMPT_REGEX = re.compile(_PROMPT_PATTERN)
def _check_prompt_group(logger, match, field, expected):
if expected:
found = match.group(field)
if expected.encode() != found:
# Throw TIMEOUT as that is what is expected and what
# would have happened.
raise pexpect.TIMEOUT("incorrect prompt, field '%s' should be '%s' but was '%s'" \
% (field, expected, found))
# This file-like class passes all writes on to the LOGGER at DEBUG.
# It is is used to direct pexpect's .logfile_read and .logfile_send
# files into the logging system.
class _Debug:
def __init__(self, logger, message):
self.logger = logger
self.message = message
def close(self):
pass
def write(self, text):
self.logger.debug(self.message, ascii(text))
def flush(self):
pass
class _Redirect:
def __init__(self, files):
self.files = files
def close(self):
pass
def write(self, text):
for f in self.files:
f.buffer.write(text)
def flush(self):
for f in self.files:
f.flush()
class Console(pexpect.spawn):
def __init__(self, command, logger, host_name=None):
# Create the child.
logger.debug("spawning '%s'", " ".join(command))
pexpect.spawn.__init__(self, command[0], args=command[1:], timeout=TIMEOUT)
self.logger = logger
self.unicode_output_files = []
self._basename = None
self._host_name = host_name
self.prompt = _PROMPT_REGEX
#This crashes inside of pexpect!
#self.logger.debug("child is '%s'", self.child)
# route low level output to the logger
self.logfile_read = _Debug(self.logger, "read <<%s>>>")
self.logfile_send = _Debug(self.logger, "send <<%s>>>")
def _check_prompt(self):
"""Match wild-card of the prompt pattern; return status"""
self.logger.debug("match %s contains %s", self.match, self.match.groupdict())
# If basename is known, make certain it doesn't change.
# Catches scripts changing directory.
_check_prompt_group(self.logger, self.match, HOSTNAME_GROUP, self._host_name)
_check_prompt_group(self.logger, self.match, BASENAME_GROUP, self._basename)
# If there's no status, return None, not empty.
status = self.match.group(STATUS_GROUP)
if status:
status = int(status)
else:
status = None
self.logger.debug("exit code '%s'", status)
return status
def run(self, command, timeout=TIMEOUT):
self.logger.debug("run '%s' expecting prompt", command)
self.sendline(command)
# This can throw a pexpect.TIMEOUT or pexpect.EOF exception
self.expect(self.prompt, timeout=timeout)
status = self._check_prompt()
self.logger.debug("run exit status %s", status)
return status
def chdir(self, directory):
# save directory so run() can verify it
self._basename = os.path.basename(directory)
if self.run("cd " + directory):
# i.e., non-zero exit code
raise Exception("'%s' failed", directory)
def redirect_output(self, unicode_file):
if unicode_file:
self.unicode_output_files.append(unicode_file)
self.logger.debug("switching output from %s to %s's buffer",
self.logfile, unicode_file)
self.logfile = _Redirect(self.unicode_output_files)
else:
self.unicode_output_files = []
self.logfile = None
def drain(self):
self.logger.debug("draining any existing output")
if self.expect([rb'.+', pexpect.TIMEOUT], timeout=0) == 0:
self.logger.info("discarding '%s' and re-draining", self.match)
self.expect([rb'.+', pexpect.TIMEOUT], timeout=0)
|