1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
|
# type: ignore
from io import StringIO
import re
import sys
import textwrap
import pexpect
def expect_exact(context, expected, timeout):
timedout = False
try:
context.cli.expect_exact(expected, timeout=timeout)
except pexpect.TIMEOUT:
timedout = True
if timedout:
# Strip color codes out of the output.
actual = re.sub(r"\x1b\[([0-9A-Za-z;?])+[m|K]?", "", context.cli.before)
raise Exception(
textwrap.dedent(
f"""\
Expected:
---
{expected!r}
---
Actual:
---
{actual!r}
---
Full log:
---
{context.logfile.getvalue()!r}
---
"""
)
)
def expect_pager(context, expected, timeout):
expect_exact(context, f"{context.conf['pager_boundary']}\r\n{expected}{context.conf['pager_boundary']}\r\n", timeout=timeout)
def run_cli(context, run_args=None, exclude_args=None):
"""Run the process using pexpect."""
run_args = run_args or {}
rendered_args = []
exclude_args = set(exclude_args) if exclude_args else set()
conf = dict(**context.conf)
conf.update(run_args)
def add_arg(name, key, value):
if name not in exclude_args:
if value is not None:
rendered_args.extend((key, value))
else:
rendered_args.append(key)
if conf.get("host", None):
add_arg("host", "-h", conf["host"])
if conf.get("user", None):
add_arg("user", "-u", conf["user"])
if conf.get("pass", None):
add_arg("pass", "-p", conf["pass"])
if conf.get("port", None):
add_arg("port", "-P", str(conf["port"]))
if conf.get("dbname", None):
add_arg("dbname", "-D", conf["dbname"])
if conf.get("defaults-file", None):
add_arg("defaults_file", "--defaults-file", conf["defaults-file"])
if conf.get("myclirc", None):
add_arg("myclirc", "--myclirc", conf["myclirc"])
if conf.get("login_path"):
add_arg("login_path", "--login-path", conf["login_path"])
for arg_name, arg_value in conf.items():
if arg_name.startswith("-"):
add_arg(arg_name, arg_name, arg_value)
try:
cli_cmd = context.conf["cli_command"]
except KeyError:
cli_cmd = f'{sys.executable} -c "import coverage ; coverage.process_startup(); import mycli.main; mycli.main.cli()"'
cmd_parts = [cli_cmd] + rendered_args
cmd = " ".join(cmd_parts)
context.cli = pexpect.spawnu(cmd, cwd=context.package_root)
context.logfile = StringIO()
context.cli.logfile = context.logfile
context.exit_sent = False
context.currentdb = context.conf["dbname"]
def wait_prompt(context, prompt=None):
"""Make sure prompt is displayed."""
if prompt is None:
user = context.conf["user"]
host = context.conf["host"]
dbname = context.currentdb
prompt = (f"{user}@{host}:{dbname}>",)
expect_exact(context, prompt, timeout=5)
context.atprompt = True
|