File: 040_pglogical_sync_during_write.pl

package info (click to toggle)
pglogical 2.4.6-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,300 kB
  • sloc: ansic: 39,268; sql: 4,466; perl: 693; makefile: 210; sh: 78
file content (325 lines) | stat: -rw-r--r-- 11,157 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
#
# Test for RT#60889, trying to reproduce an issue where sync fails during
# catchup with a walsender timeout for as-yet-unknown reasons.
#
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Data::Dumper;
use Test::More;
use Time::HiRes;
use Carp;

my $PGBENCH_SCALE = $ENV{PGBENCH_SCALE} // 1;
my $PGBENCH_CLIENTS = $ENV{PGBENCH_CLIENTS} // 10;
my $PGBENCH_JOBS = $ENV{PGBENCH_JOBS} // 1;
my $PGBENCH_TIME = $ENV{PGBENCH_TIME} // 120;
my $WALSENDER_TIMEOUT = $ENV{PGBENCH_TIMEOUT} // '5s';

$SIG{__DIE__} = sub { Carp::confess @_ };
$SIG{INT}  = sub { die("interupted by SIGINT"); };

my $dbname="pgltest";
my $super_user="super";
my $node_provider = get_new_node('provider');
$node_provider->init();
$node_provider->append_conf('postgresql.conf', qq[
wal_level = 'logical'
max_replication_slots = 12
max_wal_senders = 12
wal_sender_timeout = '$WALSENDER_TIMEOUT'
max_connections = 200
log_line_prefix = '%t %p '
shared_preload_libraries = 'pglogical'
track_commit_timestamp = on
pglogical.synchronous_commit = true
]);
$node_provider->dump_info;
$node_provider->start;
$node_provider->safe_psql('postgres', "CREATE DATABASE $dbname");

my $node_subscriber = get_new_node('subscriber');
$node_subscriber->init();
$node_subscriber->append_conf('postgresql.conf', qq[
shared_preload_libraries = 'pglogical'
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
track_commit_timestamp = on
fsync=off
pglogical.synchronous_commit = true
log_line_prefix = '%t %p '
]);
$node_subscriber->dump_info;
$node_subscriber->start;
$node_subscriber->safe_psql('postgres', "CREATE DATABASE $dbname");

# Create provider node on master:
$node_provider->safe_psql($dbname,
        "CREATE USER $super_user SUPERUSER;");
$node_provider->safe_psql($dbname,
        "CREATE EXTENSION IF NOT EXISTS pglogical VERSION '1.0.0';");
$node_provider->safe_psql($dbname,
        "ALTER EXTENSION pglogical UPDATE;");

my $provider_connstr = $node_provider->connstr;
print "node_provider - connstr : $provider_connstr\n";

$node_provider->safe_psql($dbname,
        "SELECT * FROM pglogical.create_node(node_name := 'test_provider', dsn := '$provider_connstr dbname=$dbname user=$super_user');");

# Create subscriber node on subscriber:
$node_subscriber->safe_psql($dbname,
        "CREATE USER $super_user SUPERUSER;");
$node_subscriber->safe_psql($dbname,
        "CREATE EXTENSION IF NOT EXISTS pglogical VERSION '1.0.0';");
$node_subscriber->safe_psql($dbname,
        "ALTER EXTENSION pglogical UPDATE;");
my $subscriber_connstr = $node_subscriber->connstr;
print "node_subscriber - connstr : $subscriber_connstr\n";

$node_subscriber->safe_psql($dbname,
        "SELECT * FROM pglogical.create_node(node_name := 'test_subscriber', dsn := '$subscriber_connstr dbname=$dbname user=$super_user');");

# Initialise pgbench on provider and print initial data count in tables
$node_provider->command_ok([ 'pgbench', '-i', '-s', $PGBENCH_SCALE, $dbname],
        'initialize pgbench');

my @pgbench_tables = ('pgbench_accounts', 'pgbench_tellers', 'pgbench_history');

# Add pgbench tables to repset
for my $tbl (@pgbench_tables)
{
	my $setname = 'default';
	$setname = 'default_insert_only' if ($tbl eq 'pgbench_history');
	$node_provider->safe_psql($dbname,
			"SELECT * FROM pglogical.replication_set_add_table('$setname', '$tbl', false);");
}

$node_subscriber->safe_psql($dbname,
        "SELECT pglogical.create_subscription(
    subscription_name := 'test_subscription',
    synchronize_structure := true,
    synchronize_data := true,
    provider_dsn := '$provider_connstr dbname=$dbname user=$super_user'
);");

$node_subscriber->poll_query_until($dbname,
	q[SELECT EXISTS (SELECT 1 FROM pglogical.show_subscription_status() where subscription_name = 'test_subscription' AND status = 'replicating')])
	or BAIL_OUT('subscription failed to reach "replicating" state');

# Make write-load active on the tables  pgbench_history
# with this TPC-B-ish run. Run it in the background.

diag "provider is" . $node_provider->name;
diag "max_connections is " . $node_provider->safe_psql($dbname, 'SHOW max_connections;');

my $pgbench_stdout='';
my $pgbench_stderr='';
my $pgbench_handle = IPC::Run::start(
	[ 'pgbench', '-T', $PGBENCH_TIME, '-j', $PGBENCH_JOBS, '-s', $PGBENCH_SCALE, '-c', $PGBENCH_CLIENTS, $node_provider->connstr($dbname)],
        '>', \$pgbench_stdout, '2>', \$pgbench_stderr);
$pgbench_handle->pump();


# Wait for pgbench to connect
$node_provider->poll_query_until($dbname,
	q[SELECT EXISTS (SELECT 1 FROM pg_stat_activity WHERE query like 'UPDATE pgbench%')])
	or BAIL_OUT('pgbench process is not running currently');

$node_provider->safe_psql($dbname, q[ALTER SYSTEM SET log_statement = 'ddl']);
$node_provider->safe_psql($dbname, q[SELECT pg_reload_conf();]);

# Let it warm up for a while
note "warming up pgbench for " . ($PGBENCH_TIME/10) . "s";
sleep($PGBENCH_TIME/10);
note "done warmup";

open(my $publog, "<", $node_provider->logfile)
	or die "can't open log file for provider at " . $node_provider->logfile . ": $!";
open(my $sublog, "<", $node_subscriber->logfile)
	or die "can't open log file for subscriber at " . $node_subscriber->logfile . ": $!";

my $walsender_pid = int($node_provider->safe_psql($dbname, q[SELECT pid FROM pg_stat_activity WHERE application_name = 'test_subscription']));
my $apply_pid = int($node_subscriber->safe_psql($dbname, q[SELECT pid FROM pg_stat_activity WHERE application_name LIKE '%apply%']));
note "wal sender pid is $walsender_pid; apply worker pid is $apply_pid";

# Seek to log EOF
seek($publog, 2, 0);
seek($sublog, 2, 0);

my $i = 1;
do {
	# Resync all the tables in turn
	EACH_TABLE: for my $tbl (@pgbench_tables)
	{
		my $publogpos = tell($publog);
		my $sublogpos = tell($sublog);

		my $resync_start = [Time::HiRes::gettimeofday()];

		eval {
			$node_subscriber->safe_psql($dbname,
					"SELECT * FROM pglogical.alter_subscription_resynchronize_table('test_subscription', '$tbl');");
		};
		if ($@)
		{
			diag "attempt to resync $tbl failed with $@; sync_status is currently " .
				$node_subscriber->safe_psql($dbname, "SELECT sync_status FROM pglogical.local_sync_status WHERE sync_relname = '$tbl'");
			fail("$tbl didn't sync: resync request failed");
			next EACH_TABLE;
		}

		while (1)
		{
			Time::HiRes::usleep(100);

			my $running = $node_subscriber->safe_psql($dbname,
				qq[SELECT pid FROM pg_stat_activity WHERE application_name LIKE '%sync%']);

			my $status = $node_subscriber->safe_psql($dbname,
				qq[SELECT sync_status FROM pglogical.local_sync_status WHERE sync_relname = '$tbl']);

			if ($status eq 'r')
			{
				pass("$tbl synced on iteration $i (elapsed " . Time::HiRes::tv_interval($resync_start) . ")" );
				last;
			}
			elsif ($status eq 'i')
			{
				# worker still starting
			}
			elsif ($status eq 'y')
			{
				# worker done but master hasn't noticed yet
				# keep looping until master notices and switches to 'r'
			}
			elsif (!$running)
			{
				fail("$tbl didn't sync on iteration $i, sync worker exited (running=$running) while sync state was '$status' (elapsed " . Time::HiRes::tv_interval($resync_start) . ")" );
			}
		}

		# look for walsender timeouts in logs since last test
		# We must seek to reset any prior eof marker
		seek($publog, 0, $publogpos);
		seek($sublog, 0, $sublogpos);
		# then look for log lines of interest
		my $timeout_line;
		my $finished_sync_line;
		while (my $line = <$publog>)
		{
			if ($line =~ qr/replication timeout/ && !$timeout_line)
			{
				$timeout_line = $line;

				diag "status line after failed sync is "
					. $node_subscriber->safe_psql($dbname,
					 	qq[SELECT * FROM pglogical.local_sync_status WHERE sync_relname = '$tbl']);

				if ($line =~ qr/\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} [A-Z]+ (\d+) LOG:/)
				{
					if (int($1) == $walsender_pid)
					{
						diag "terminated walsender was the main walsender for the apply worker, trying to make apply core of pid $apply_pid";
						system("gcore -o tmp_check/core.$apply_pid $apply_pid")
							and diag "core saved as tmp_check/core.$apply_pid"
							or diag "couldn't save core, see logs";
					}
					else
					{
						diag "terminated walsender was not for apply worker, looking for a sync worker";
						my $sync_pid = int($node_subscriber->safe_psql($dbname, q[SELECT pid FROM pg_stat_activity WHERE application_name LIKE '%sync%']));
						if ($sync_pid)
						{
							diag "found running sync worker $sync_pid, trying to make core";
							system("gcore $sync_pid");
						}
						else
						{
							diag "no sync worker found running";
						}
					}
				}
				else
				{
					carp "couldn't match line format for $line";
				}
			}
		}

		while (my $line = <$sublog>)
		{
			if ($line =~ qr/finished sync of table/ && !$finished_sync_line)
			{
				$finished_sync_line = $line;
			}
		}

		# This test is racey, because we don't emit this message until
		# after the sync commits so we quite possibly won't see it here
		# after we finish waiting for synced state.
		#
		#isnt($finished_sync_line, undef, "found finished sync line in last test logs");

		is($timeout_line, undef, "no walsender timeout since last test");
	}

	$i ++;

	# and repeat until pgbench exits
} while ($node_provider->safe_psql($dbname, q[SELECT 1 FROM pg_stat_activity WHERE application_name = 'pgbench']));

$pgbench_handle->finish;
note "##### output of pgbench #####";
note $pgbench_stdout;
note "##### end of output #####";

is($pgbench_handle->full_result(0), 0, "pgbench run successfull ");

note " waiting for catchup";

# Wait for catchup
$node_provider->safe_psql($dbname, 'SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);');

note "comparing tables";

# Compare final table entries on provider and subscriber.
for my $tbl (@pgbench_tables)
{
	my $rowcount_provider = $node_provider->safe_psql($dbname,
			"SELECT count(*) FROM $tbl;");

	my $rowcount_subscriber = $node_subscriber->safe_psql($dbname,
			"SELECT count(*) FROM $tbl;");

	my $matched = is($rowcount_subscriber, $rowcount_provider,
		"final $tbl row counts match after sync");
	if (!$matched)
	{
		diag "final provider rowcount for $tbl is $rowcount_provider, but subscriber has $rowcount_subscriber";

		my $sortkey;
		if ($tbl == "pgbench_history") {
			$sortkey = "1, 2, 3, 4";
		} elsif ($tbl == "pgbench_tellers" || $tbl == "pgbench_accounts") {
			$sortkey = "1, 2";
		} elsif ($tbl == "pgbench_branches") {
			$sortkey = "1";
		}

		# Compare the tables
		$node_provider->safe_psql($dbname, qq[\\copy (SELECT * FROM $tbl ORDER BY $sortkey) to tmp_check/$tbl-provider]);
		$node_subscriber->safe_psql($dbname, qq[\\copy (SELECT * FROM $tbl ORDER BY $sortkey) to tmp_check/$tbl-subscriber]);
		$node_subscriber->safe_psql($dbname, qq[\\copy (SELECT * FROM $tbl, pglogical.xact_commit_timestamp_origin($tbl.xmin) ORDER BY $sortkey) to tmp_check/$tbl-subscriber-detail]);
		IPC::Run::run(['diff', '-u', "tmp_check/$tbl-provider", "tmp_check/$tbl-subscriber"], '>', "tmp_check/$tbl-diff");
		diag "differences between $tbl on provider and subscriber recorded in tmp_check/";
	}

}

$node_subscriber->teardown_node;
$node_provider->teardown_node;

done_testing();