File: basic_replication.py

package info (click to toggle)
patroni 4.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,704 kB
  • sloc: python: 29,743; sh: 573; makefile: 29
file content (144 lines) | stat: -rw-r--r-- 5,298 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import json

from time import sleep, time

import parse

from behave import register_type, step, then

import patroni.psycopg as pg


@parse.with_pattern(r'[a-z][a-z0-9_\-]*[a-z0-9]')
def parse_name(text):
    return text


register_type(name=parse_name)


@step('I start {name:name}')
def start_patroni(context, name):
    return context.pctl.start(name)


@step('I start duplicate {name:name} on port {port:d}')
def start_duplicate_patroni(context, name, port):
    config = {
        "name": name,
        "restapi": {
            "listen": "127.0.0.1:{0}".format(port)
        }
    }
    try:
        context.pctl.start('dup-' + name, custom_config=config)
        assert False, "Process was expected to fail"
    except AssertionError as e:
        assert 'is not running after being started' in str(e), \
            "No error was raised by duplicate start of {0} ".format(name)


@step('I shut down {name:name}')
def stop_patroni(context, name):
    return context.pctl.stop(name, timeout=60)


@step('I kill {name:name}')
def kill_patroni(context, name):
    return context.pctl.stop(name, kill=True)


@step('I shut down postmaster on {name:name}')
def stop_postgres(context, name):
    return context.pctl.stop(name, postgres=True)


@step('I kill postmaster on {name:name}')
def kill_postgres(context, name):
    return context.pctl.stop(name, kill=True, postgres=True)


def get_wal_name(context, pg_name):
    version = context.pctl.query(pg_name, "SHOW server_version_num").fetchone()[0]
    return 'xlog' if int(version) / 10000 < 10 else 'wal'


@step('I add the table {table_name:w} to {pg_name:name}')
def add_table(context, table_name, pg_name):
    # parse the configuration file and get the port
    try:
        context.pctl.query(pg_name, "CREATE TABLE public.{0}()".format(table_name))
        context.pctl.query(pg_name, "SELECT pg_switch_{0}()".format(get_wal_name(context, pg_name)))
    except pg.Error as e:
        assert False, "Error creating table {0} on {1}: {2}".format(table_name, pg_name, e)


@step('I {action:w} wal replay on {pg_name:name}')
def toggle_wal_replay(context, action, pg_name):
    # pause or resume the wal replay process
    try:
        context.pctl.query(pg_name, "SELECT pg_{0}_replay_{1}()".format(get_wal_name(context, pg_name), action))
    except pg.Error as e:
        assert False, "Error during {0} wal recovery on {1}: {2}".format(action, pg_name, e)


@step('I {action:w} table on {pg_name:name}')
def crdr_mytest(context, action, pg_name):
    try:
        if (action == "create"):
            context.pctl.query(pg_name, "create table if not exists public.mytest(id numeric)")
        else:
            context.pctl.query(pg_name, "drop table if exists public.mytest")
    except pg.Error as e:
        assert False, "Error {0} table mytest on {1}: {2}".format(action, pg_name, e)


@step('I load data on {pg_name:name}')
def initiate_load(context, pg_name):
    # perform dummy load
    try:
        context.pctl.query(pg_name, "insert into public.mytest select r::numeric from generate_series(1, 350000) r")
    except pg.Error as e:
        assert False, "Error loading test data on {0}: {1}".format(pg_name, e)


@then('table {table_name:w} is present on {pg_name:name} after {max_replication_delay:d} seconds')
def table_is_present_on(context, table_name, pg_name, max_replication_delay):
    max_replication_delay *= context.timeout_multiplier
    for _ in range(int(max_replication_delay)):
        if context.pctl.query(pg_name, "SELECT 1 FROM public.{0}".format(table_name), fail_ok=True) is not None:
            break
        sleep(1)
    else:
        assert False, \
            "Table {0} is not present on {1} after {2} seconds".format(table_name, pg_name, max_replication_delay)


@then('{pg_name:name} role is the {pg_role:w} after {max_promotion_timeout:d} seconds')
def check_role(context, pg_name, pg_role, max_promotion_timeout):
    max_promotion_timeout *= context.timeout_multiplier
    assert context.pctl.check_role_has_changed_to(pg_name, pg_role, timeout=int(max_promotion_timeout)), \
        "{0} role didn't change to {1} after {2} seconds".format(pg_name, pg_role, max_promotion_timeout)


@step('replication works from {primary:name} to {replica:name} after {time_limit:d} seconds')
@then('replication works from {primary:name} to {replica:name} after {time_limit:d} seconds')
def replication_works(context, primary, replica, time_limit):
    context.execute_steps("""
        When I add the table test_{0} to {1}
        Then table test_{0} is present on {2} after {3} seconds
    """.format(str(time()).replace('.', '_').replace(',', '_'), primary, replica, time_limit))


@step('there is one of {message_list} {level:w} in the {node} patroni log after {timeout:d} seconds')
def check_patroni_log(context, message_list, level, node, timeout):
    timeout *= context.timeout_multiplier
    message_list = json.loads(message_list)

    for _ in range(int(timeout)):
        messages_of_level = context.pctl.read_patroni_log(node, level)
        if any(any(message in line for line in messages_of_level) for message in message_list):
            break
        sleep(1)
    else:
        assert False, f"There were none of {message_list} {level} in the {node} patroni log after {timeout} seconds"