File: conflict.test.py

package info (click to toggle)
tarantool 2.6.0-1.4
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 85,412 kB
  • sloc: ansic: 513,775; cpp: 69,493; sh: 25,650; python: 19,190; perl: 14,973; makefile: 4,178; yacc: 1,329; sql: 1,074; pascal: 620; ruby: 190; awk: 18; lisp: 7
file content (120 lines) | stat: -rw-r--r-- 3,418 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from lib.tarantool_server import TarantoolServer
from time import sleep
import yaml

def check_replication(nodes, select_args=''):
    for node in nodes:
        node.admin('box.space.test:select{%s}' % select_args)

master = server
master.admin("box.schema.user.grant('guest', 'replication')")

replica = TarantoolServer(server.ini)
replica.script = 'replication-py/replica.lua'
replica.vardir = server.vardir
replica.rpl_master = master
replica.deploy()

def parallel_run(cmd1, cmd2, compare):
    print 'parallel send: %s' % cmd1
    print 'parallel send: %s' % cmd2
    master.admin.socket.sendall('%s\n' % cmd1)
    replica.admin.socket.sendall('%s\n' % cmd2)

    master.admin.socket.recv(2048)
    replica.admin.socket.recv(2048)

    # wait for status changing in tarantool
    master_status = yaml.safe_load(master.admin(
        'box.info().replication[2].upstream.status', silent=True
    ))[0]
    replica_status = yaml.safe_load(replica.admin(
        'box.info().replication[1].upstream.status', silent=True
    ))[0]

    # wait for status
    results = [f(master_status, replica_status) for f in compare]
    while True:
        sleep(0.01)
        if any(results):
            print 'replication state is correct'
            break

def prepare_cluster():
    print 'reset master-master replication'
    master.stop()
    master.cleanup()
    master.start()
    master.admin("box.schema.user.grant('guest', 'replication')", silent=True)

    replica.stop()
    replica.cleanup()
    replica.start()

    master.admin("box.cfg{replication='%s'}" % replica.iproto.uri, silent=True)
    r1_id = replica.get_param('id')
    r2_id = master.get_param('id')

    master.admin("space = box.schema.space.create('test')", silent=True)
    master.admin("index = space:create_index('primary', { type = 'tree'})", silent=True)
    master.admin('for k = 1, 9 do space:insert{k, k*k} end', silent=True)

    # wait lsn
    replica.wait_lsn(r2_id, master.get_lsn(r2_id))
    master.wait_lsn(r1_id, replica.get_lsn(r1_id))

# test1: double update in master and replica
prepare_cluster()
parallel_run(
    "box.space.test:update(1, {{'#', 2, 1}})",
    "box.space.test:update(1, {{'#', 2, 1}})",
    [
        lambda x,y: x == 'stopped' or y == 'stopped',
        lambda x,y: x == 'follow' and y == 'follow',
    ]
)
check_replication([master, replica], '1')

# test2: insert different values with single id
prepare_cluster()
parallel_run(
    'box.space.test:insert{20, 1}',
    'box.space.test:insert{20, 2}',
    [
        lambda x,y: x == 'stopped' or y == 'stopped',
        lambda x,y: x == 'follow' and y == 'follow',
    ]
)

# test3: update different values
prepare_cluster()
parallel_run(
    "box.space.test:update(2, {{'=', 2, 1}})",
    "box.space.test:update(2, {{'=', 2, 2}})",
    [lambda x,y: x == 'follow' and y == 'follow',]
)

# test4: CRDT increment with update
prepare_cluster()
parallel_run(
    "box.space.test:update(1, {{'+', 2, 1}})",
    "box.space.test:update(1, {{'+', 2, 2}})",
    [lambda x,y: x == 'follow' and y == 'follow',]
)
check_replication([master, replica], '1')

# test5: delete not existing key
prepare_cluster()
parallel_run(
    "box.space.test:delete(999)",
    "box.space.test:delete(999)",
    [lambda x,y: x == 'follow' and y == 'follow',]
)
check_replication([master, replica])

# cleanup
replica.stop()
replica.cleanup()
server.stop()
server.cleanup()
server.deploy()