File: serial-console-connection

package info (click to toggle)
grml-debootstrap 0.123
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 464 kB
  • sloc: sh: 2,562; python: 165; makefile: 72; ansic: 49
file content (198 lines) | stat: -rwxr-xr-x 5,945 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#!/usr/bin/env python3
import argparse
import re
import serial
import os
import shutil
import subprocess
import sys
import time

parser = argparse.ArgumentParser(
    description="Connect to serial console to execute stuff",
    formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
    "--qemu-log",
    dest="qemu_log",
    required=True,
    help="QEMU log to look for serial port info in",
)
parser.add_argument(
    "--hostname", default="trixie", help="hostname of the system for login process"
)
parser.add_argument("--user", default="root", help="user name to use for login")
parser.add_argument("--password", default="grml", help="password for login")
parser.add_argument(
    "--timeout",
    default="180",
    type=int,
    help="Maximum time for finding the login prompt, in seconds",
)
parser.add_argument(
    "--screenshot",
    default="screenshot.jpg",
    help="file name for screenshot captured via VNC on error",
)
parser.add_argument(
    "--tries",
    default="12",
    type=int,
    help="Number of retries for finding the login prompt",
)
parser.add_argument(
    "--poweroff",
    action="store_true",
    default=False,
    help='send "poweroff" command after all other commands',
)
parser.add_argument("command", nargs="+", help="command to execute after logging in")


def print_ser_lines(ser):
    for line in ser.readlines():
        print("<<", line)  # line will be a binary string


def write_ser_line(ser, text):
    print(">>", text)
    ser.write(("%s\n" % text).encode())
    ser.flush()


def wait_ser_text(ser, texts, timeout):
    if isinstance(texts, str):
        texts = [texts]

    ts_max = time.time() + timeout
    ser.timeout = 1
    texts_encoded = [text.encode("utf-8") for text in texts]
    found = False
    print(time.time(), "D: expecting one of", texts_encoded)
    while ts_max > time.time() and found is False:
        for line in ser.readlines():
            print(time.time(), "<<", line)  # line will be a binary string
            for index, text in enumerate(texts_encoded):
                if text in line:
                    found = texts[index]
                    break
    return found


def login(ser, hostname, user, password, timeout):
    login_prompt = "%s login:" % hostname
    shell_prompt = "%s@%s" % (user, hostname)

    write_ser_line(ser, "")  # send newline

    found = wait_ser_text(ser, [shell_prompt, login_prompt], timeout=timeout * 4)
    if found == shell_prompt:
        return
    elif found is False:
        raise ValueError("timeout waiting for login prompt")

    print("Logging in...")
    write_ser_line(ser, user)
    if not wait_ser_text(ser, "Password:", timeout=timeout):
        raise ValueError("timeout waiting for password prompt")

    write_ser_line(ser, password)
    time.sleep(1)

    print(time.time(), "Waiting for shell prompt...")
    if not wait_ser_text(ser, shell_prompt, timeout=timeout * 4):
        raise ValueError("timeout waiting for shell prompt")


def capture_vnc_screenshot(screenshot_file):
    if not shutil.which("vncsnapshot"):
        print("WARN: vncsnapshot not available, skipping vnc snapshot capturing.")
        return

    print("Trying to capture screenshot via vncsnapshot to", screenshot_file)

    proc = subprocess.Popen(["vncsnapshot", "localhost", screenshot_file])
    proc.wait()
    if proc.returncode != 0:
        print("WARN: failed to capture vnc snapshot :(")
    else:
        print("Screenshot file '%s' available" % os.path.abspath(screenshot_file))


def find_serial_port_from_qemu_log(qemu_log, tries):
    port = None
    for i in range(tries):
        print("Waiting for qemu to present serial console [try %s]" % (i, ))
        with open(qemu_log, "r", encoding="utf-8") as fp:
            qemu_log_messages = fp.read().splitlines()
        for line in qemu_log_messages:
            m = re.match("char device redirected to ([^ ]+)", line)
            if m:
                port = m.group(1)
                break
        if port:
            break
        time.sleep(5)

    print("qemu log (up to char device redirect) follows:")
    print("\n".join(qemu_log_messages))
    print()
    return port  # might be None, caller has to deal with it


def main():
    args = parser.parse_args()
    hostname = args.hostname
    password = args.password
    qemu_log = args.qemu_log
    user = args.user
    commands = args.command
    screenshot_file = args.screenshot

    port = find_serial_port_from_qemu_log(qemu_log, args.tries)
    if not port:
        print()
        print("E: no serial port found in qemu log", qemu_log)
        sys.exit(1)

    ser = serial.Serial(port, 115200)
    ser.flushInput()
    ser.flushOutput()

    success = False
    ts_start = time.time()
    for i in range(args.tries):
        try:
            print("Logging into %s via serial console [try %s]" % (port, i))
            login(ser, hostname, user, password, 30)
            success = True
            break
        except Exception as except_inst:
            print("Login failure (try %s):" % (i,), except_inst)
            time.sleep(5)
        if time.time() - ts_start > args.timeout:
            print("E: Timeout reached waiting for login prompt")
            break

    if success:
        write_ser_line(ser, "")
        ser.timeout = 30
        print_ser_lines(ser)
        print("Running commands...")
        for command in commands:
            write_ser_line(ser, command)
            print_ser_lines(ser)
        if args.poweroff:
            print("Sending final poweroff command...")
            write_ser_line(ser, "poweroff")
            ser.flush()
            # after poweroff, the serial device will probably vanish. do not attempt reading from it anymore.

    if not success:
        print("W: Running tests failed, saving screenshot")
        capture_vnc_screenshot(screenshot_file)
        sys.exit(1)


if __name__ == "__main__":
    main()