1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
import json
from time import sleep, time
import parse
from behave import register_type, step, then
import patroni.psycopg as pg
@parse.with_pattern(r'[a-z][a-z0-9_\-]*[a-z0-9]')
def parse_name(text):
return text
register_type(name=parse_name)
@step('I start {name:name}')
def start_patroni(context, name):
return context.pctl.start(name)
@step('I start duplicate {name:name} on port {port:d}')
def start_duplicate_patroni(context, name, port):
config = {
"name": name,
"restapi": {
"listen": "127.0.0.1:{0}".format(port)
}
}
try:
context.pctl.start('dup-' + name, custom_config=config)
assert False, "Process was expected to fail"
except AssertionError as e:
assert 'is not running after being started' in str(e), \
"No error was raised by duplicate start of {0} ".format(name)
@step('I shut down {name:name}')
def stop_patroni(context, name):
return context.pctl.stop(name, timeout=60)
@step('I kill {name:name}')
def kill_patroni(context, name):
return context.pctl.stop(name, kill=True)
@step('I shut down postmaster on {name:name}')
def stop_postgres(context, name):
return context.pctl.stop(name, postgres=True)
@step('I kill postmaster on {name:name}')
def kill_postgres(context, name):
return context.pctl.stop(name, kill=True, postgres=True)
def get_wal_name(context, pg_name):
version = context.pctl.query(pg_name, "SHOW server_version_num").fetchone()[0]
return 'xlog' if int(version) / 10000 < 10 else 'wal'
@step('I add the table {table_name:w} to {pg_name:name}')
def add_table(context, table_name, pg_name):
# parse the configuration file and get the port
try:
context.pctl.query(pg_name, "CREATE TABLE public.{0}()".format(table_name))
context.pctl.query(pg_name, "SELECT pg_switch_{0}()".format(get_wal_name(context, pg_name)))
except pg.Error as e:
assert False, "Error creating table {0} on {1}: {2}".format(table_name, pg_name, e)
@step('I {action:w} wal replay on {pg_name:name}')
def toggle_wal_replay(context, action, pg_name):
# pause or resume the wal replay process
try:
context.pctl.query(pg_name, "SELECT pg_{0}_replay_{1}()".format(get_wal_name(context, pg_name), action))
except pg.Error as e:
assert False, "Error during {0} wal recovery on {1}: {2}".format(action, pg_name, e)
@step('I {action:w} table on {pg_name:name}')
def crdr_mytest(context, action, pg_name):
try:
if (action == "create"):
context.pctl.query(pg_name, "create table if not exists public.mytest(id numeric)")
else:
context.pctl.query(pg_name, "drop table if exists public.mytest")
except pg.Error as e:
assert False, "Error {0} table mytest on {1}: {2}".format(action, pg_name, e)
@step('I load data on {pg_name:name}')
def initiate_load(context, pg_name):
# perform dummy load
try:
context.pctl.query(pg_name, "insert into public.mytest select r::numeric from generate_series(1, 350000) r")
except pg.Error as e:
assert False, "Error loading test data on {0}: {1}".format(pg_name, e)
@then('table {table_name:w} is present on {pg_name:name} after {max_replication_delay:d} seconds')
def table_is_present_on(context, table_name, pg_name, max_replication_delay):
max_replication_delay *= context.timeout_multiplier
for _ in range(int(max_replication_delay)):
if context.pctl.query(pg_name, "SELECT 1 FROM public.{0}".format(table_name), fail_ok=True) is not None:
break
sleep(1)
else:
assert False, \
"Table {0} is not present on {1} after {2} seconds".format(table_name, pg_name, max_replication_delay)
@then('{pg_name:name} role is the {pg_role:w} after {max_promotion_timeout:d} seconds')
def check_role(context, pg_name, pg_role, max_promotion_timeout):
max_promotion_timeout *= context.timeout_multiplier
assert context.pctl.check_role_has_changed_to(pg_name, pg_role, timeout=int(max_promotion_timeout)), \
"{0} role didn't change to {1} after {2} seconds".format(pg_name, pg_role, max_promotion_timeout)
@step('replication works from {primary:name} to {replica:name} after {time_limit:d} seconds')
@then('replication works from {primary:name} to {replica:name} after {time_limit:d} seconds')
def replication_works(context, primary, replica, time_limit):
context.execute_steps("""
When I add the table test_{0} to {1}
Then table test_{0} is present on {2} after {3} seconds
""".format(str(time()).replace('.', '_').replace(',', '_'), primary, replica, time_limit))
@step('there is one of {message_list} {level:w} in the {node} patroni log after {timeout:d} seconds')
def check_patroni_log(context, message_list, level, node, timeout):
timeout *= context.timeout_multiplier
message_list = json.loads(message_list)
for _ in range(int(timeout)):
messages_of_level = context.pctl.read_patroni_log(node, level)
if any(any(message in line for line in messages_of_level) for message in message_list):
break
sleep(1)
else:
assert False, f"There were none of {message_list} {level} in the {node} patroni log after {timeout} seconds"
|