1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
|
#!/usr/bin/env python3
import argparse
import re
import serial
import os
import shutil
import subprocess
import sys
import time
parser = argparse.ArgumentParser(
description="Connect to serial console to execute stuff",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--qemu-log",
dest="qemu_log",
required=True,
help="QEMU log to look for serial port info in",
)
parser.add_argument(
"--hostname", default="trixie", help="hostname of the system for login process"
)
parser.add_argument("--user", default="root", help="user name to use for login")
parser.add_argument("--password", default="grml", help="password for login")
parser.add_argument(
"--timeout",
default="180",
type=int,
help="Maximum time for finding the login prompt, in seconds",
)
parser.add_argument(
"--screenshot",
default="screenshot.jpg",
help="file name for screenshot captured via VNC on error",
)
parser.add_argument(
"--tries",
default="12",
type=int,
help="Number of retries for finding the login prompt",
)
parser.add_argument(
"--poweroff",
action="store_true",
default=False,
help='send "poweroff" command after all other commands',
)
parser.add_argument("command", nargs="+", help="command to execute after logging in")
def print_ser_lines(ser):
for line in ser.readlines():
print("<<", line) # line will be a binary string
def write_ser_line(ser, text):
print(">>", text)
ser.write(("%s\n" % text).encode())
ser.flush()
def wait_ser_text(ser, texts, timeout):
if isinstance(texts, str):
texts = [texts]
ts_max = time.time() + timeout
ser.timeout = 1
texts_encoded = [text.encode("utf-8") for text in texts]
found = False
print(time.time(), "D: expecting one of", texts_encoded)
while ts_max > time.time() and found is False:
for line in ser.readlines():
print(time.time(), "<<", line) # line will be a binary string
for index, text in enumerate(texts_encoded):
if text in line:
found = texts[index]
break
return found
def login(ser, hostname, user, password, timeout):
login_prompt = "%s login:" % hostname
shell_prompt = "%s@%s" % (user, hostname)
write_ser_line(ser, "") # send newline
found = wait_ser_text(ser, [shell_prompt, login_prompt], timeout=timeout * 4)
if found == shell_prompt:
return
elif found is False:
raise ValueError("timeout waiting for login prompt")
print("Logging in...")
write_ser_line(ser, user)
if not wait_ser_text(ser, "Password:", timeout=timeout):
raise ValueError("timeout waiting for password prompt")
write_ser_line(ser, password)
time.sleep(1)
print(time.time(), "Waiting for shell prompt...")
if not wait_ser_text(ser, shell_prompt, timeout=timeout * 4):
raise ValueError("timeout waiting for shell prompt")
def capture_vnc_screenshot(screenshot_file):
if not shutil.which("vncsnapshot"):
print("WARN: vncsnapshot not available, skipping vnc snapshot capturing.")
return
print("Trying to capture screenshot via vncsnapshot to", screenshot_file)
proc = subprocess.Popen(["vncsnapshot", "localhost", screenshot_file])
proc.wait()
if proc.returncode != 0:
print("WARN: failed to capture vnc snapshot :(")
else:
print("Screenshot file '%s' available" % os.path.abspath(screenshot_file))
def find_serial_port_from_qemu_log(qemu_log, tries):
port = None
for i in range(tries):
print("Waiting for qemu to present serial console [try %s]" % (i, ))
with open(qemu_log, "r", encoding="utf-8") as fp:
qemu_log_messages = fp.read().splitlines()
for line in qemu_log_messages:
m = re.match("char device redirected to ([^ ]+)", line)
if m:
port = m.group(1)
break
if port:
break
time.sleep(5)
print("qemu log (up to char device redirect) follows:")
print("\n".join(qemu_log_messages))
print()
return port # might be None, caller has to deal with it
def main():
args = parser.parse_args()
hostname = args.hostname
password = args.password
qemu_log = args.qemu_log
user = args.user
commands = args.command
screenshot_file = args.screenshot
port = find_serial_port_from_qemu_log(qemu_log, args.tries)
if not port:
print()
print("E: no serial port found in qemu log", qemu_log)
sys.exit(1)
ser = serial.Serial(port, 115200)
ser.flushInput()
ser.flushOutput()
success = False
ts_start = time.time()
for i in range(args.tries):
try:
print("Logging into %s via serial console [try %s]" % (port, i))
login(ser, hostname, user, password, 30)
success = True
break
except Exception as except_inst:
print("Login failure (try %s):" % (i,), except_inst)
time.sleep(5)
if time.time() - ts_start > args.timeout:
print("E: Timeout reached waiting for login prompt")
break
if success:
write_ser_line(ser, "")
ser.timeout = 30
print_ser_lines(ser)
print("Running commands...")
for command in commands:
write_ser_line(ser, command)
print_ser_lines(ser)
if args.poweroff:
print("Sending final poweroff command...")
write_ser_line(ser, "poweroff")
ser.flush()
# after poweroff, the serial device will probably vanish. do not attempt reading from it anymore.
if not success:
print("W: Running tests failed, saving screenshot")
capture_vnc_screenshot(screenshot_file)
sys.exit(1)
if __name__ == "__main__":
main()
|