File: quorum_commit.py

package info (click to toggle)
patroni 4.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,704 kB
  • sloc: python: 29,743; sh: 573; makefile: 29
file content (60 lines) | stat: -rw-r--r-- 2,245 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import json
import re
import time

from behave import step, then


@step('sync key in DCS has {key:w}={value} after {time_limit:d} seconds')
def check_sync(context, key, value, time_limit):
    time_limit *= context.timeout_multiplier
    max_time = time.time() + int(time_limit)
    dcs_value = None
    while time.time() < max_time:
        try:
            response = json.loads(context.dcs_ctl.query('sync'))
            dcs_value = response.get(key)
            if key == 'sync_standby' and set((dcs_value or '').split(',')) == set(value.split(',')):
                return
            elif str(dcs_value) == value:
                return
        except Exception:
            pass
        time.sleep(1)
    assert False, "sync does not have {0}={1} (found {2}) in dcs after {3} seconds".format(key, value,
                                                                                           dcs_value, time_limit)


def _parse_synchronous_standby_names(value):
    if '(' in value:
        m = re.match(r'.*(\d+) \(([^)]+)\)', value)
        expected_value = set(m.group(2).split())
        expected_num = m.group(1)
    else:
        expected_value = set([value])
        expected_num = '1'
    return expected_num, expected_value


@then("synchronous_standby_names on {name:2} is set to '{value}' after {time_limit:d} seconds")
def check_synchronous_standby_names(context, name, value, time_limit):
    time_limit *= context.timeout_multiplier
    max_time = time.time() + int(time_limit)

    if value == '_empty_str_':
        value = ''

    expected_num, expected_value = _parse_synchronous_standby_names(value)

    ssn = None
    while time.time() < max_time:
        try:
            ssn = context.pctl.query(name, "SHOW synchronous_standby_names").fetchone()[0]
            db_num, db_value = _parse_synchronous_standby_names(ssn)
            if expected_value == db_value and expected_num == db_num:
                return
        except Exception:
            pass
        time.sleep(1)
    assert False, "synchronous_standby_names is not set to '{0}' (found '{1}') after {2} seconds".format(value, ssn,
                                                                                                         time_limit)