1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
# type: ignore
import os
from textwrap import dedent
from behave import then, when
import wrappers
@when("we start external editor providing a file name")
def step_edit_file(context):
"""Edit file with external editor."""
context.editor_file_name = os.path.join(context.package_root, f"test_file_{context.conf['vi']}.sql")
if os.path.exists(context.editor_file_name):
os.remove(context.editor_file_name)
context.cli.sendline(f"\\e {os.path.basename(context.editor_file_name)}")
wrappers.expect_exact(context, 'Entering Ex mode. Type "visual" to go to Normal mode.', timeout=4)
wrappers.expect_exact(context, "\r\n:", timeout=4)
@when('we type "{query}" in the editor')
def step_edit_type_sql(context, query):
context.cli.sendline("i")
context.cli.sendline(query)
context.cli.sendline(".")
wrappers.expect_exact(context, "\r\n:", timeout=4)
@when("we exit the editor")
def step_edit_quit(context):
context.cli.sendline("x")
wrappers.expect_exact(context, "written", timeout=4)
@then('we see "{query}" in prompt')
def step_edit_done_sql(context, query):
for match in query.split(" "):
wrappers.expect_exact(context, match, timeout=5)
# Cleanup the command line.
context.cli.sendcontrol("c")
# Cleanup the edited file.
if context.editor_file_name and os.path.exists(context.editor_file_name):
os.remove(context.editor_file_name)
@when("we tee output")
def step_tee_ouptut(context):
context.tee_file_name = os.path.join(context.package_root, f"tee_file_{context.conf['vi']}.sql")
if os.path.exists(context.tee_file_name):
os.remove(context.tee_file_name)
context.cli.sendline(f"tee {os.path.basename(context.tee_file_name)}")
@when('we select "select {param}"')
def step_query_select_number(context, param):
context.cli.sendline(f"select {param}")
expected = (
dedent(
f"""
+{'-' * (len(param) + 2)}+\r
| {param} |\r
+{'-' * (len(param) + 2)}+\r
| {param} |\r
+{'-' * (len(param) + 2)}+
"""
).strip()
+ '\r\n\r\n'
)
wrappers.expect_pager(
context,
expected,
timeout=5,
)
wrappers.expect_exact(context, "1 row in set", timeout=2)
@then('we see tabular result "{result}"')
def step_see_tabular_result(context, result):
wrappers.expect_exact(context, f'| {result} |', timeout=2)
@then('we see csv result "{result}"')
def step_see_csv_result(context, result):
wrappers.expect_exact(context, f'"{result}"', timeout=2)
@when('we query "{query}"')
def step_query(context, query):
context.cli.sendline(query)
@when("we notee output")
def step_notee_output(context):
context.cli.sendline("notee")
@then("we see 123456 in tee output")
def step_see_123456_in_ouput(context):
with open(context.tee_file_name) as f:
assert "123456" in f.read()
if os.path.exists(context.tee_file_name):
os.remove(context.tee_file_name)
@then('we see csv {result} in file output')
def step_see_csv_result_in_redirected_ouput(context, result):
wrappers.expect_exact(context, f'"{result}"', timeout=2)
temp_filename = "/tmp/output1.csv"
if os.path.exists(temp_filename):
os.remove(temp_filename)
@then('we see text {result} in file output')
def step_see_text_result_in_redirected_ouput(context, result):
wrappers.expect_exact(context, f' {result}', timeout=2)
temp_filename = "/tmp/output1.txt"
if os.path.exists(temp_filename):
os.remove(temp_filename)
@then("we see space 12 in command output")
def step_see_space_12_in_command_ouput(context):
wrappers.expect_exact(context, ' 12', timeout=2)
@then("we see space 6 in command output")
def step_see_space_6_in_command_ouput(context):
wrappers.expect_exact(context, ' 6', timeout=2)
@then('delimiter is set to "{delimiter}"')
def delimiter_is_set(context, delimiter):
wrappers.expect_exact(context, f"Changed delimiter to {delimiter}", timeout=2)
|