1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
|
#!/usr/bin/python3
# # -*- coding: utf-8 -*-
#
# Copyright (c) 2016-2023, CYBERTEC PostgreSQL International GmbH
#
# Test the squeeze_table() function while concurrent data changes. To verify
# that all the concurrent changes have been captured and processed, run the
# test once again w/o the squeeze_table() function and compare results.
#
# If the test should run for long time and if user suspects the tables can
# grow beyond disk space available, he can pass reasonable value of
# --test-duration and use --test-iterations option to ensure that the whole
# test (from table creation to verification of results) runs many times.
import argparse
import psycopg
import random
import sys
import threading
import time
from threading import Thread, Timer
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="localhost",
help="Database server host")
parser.add_argument("--port", default="5432",
help="Database server port")
parser.add_argument("--database", default="postgres",
help="The test database name")
parser.add_argument("--user", default="postgres",
help="The user that connects to the test database")
parser.add_argument("--test-duration", type=int, default=5,
help="Test duration in seconds")
parser.add_argument("--test-iterations", type=int, default=1,
help="How many times should the test be executed")
parser.add_argument("--no-verification", action="store_true",
help="Skip verification of result, i.e. only test stability")
args = parser.parse_args()
# TODO Turn these into a command line options.
test_instances = 5
test_schema = "public"
verification_schema = "expected"
test_succeeded = True
def get_connection():
return psycopg.connect(host=args.host, port=args.port,
dbname=args.database, user=args.user)
def start_tests():
d = globals()
# All threads should check this variable and stop as soon as it becomes
# True.
d['test_done'] = False
# Instead of letting the timer wait for the whole test time, we start it
# many times for a 1-second interval so that user can interrupt the
# application.
d['timer_executions'] = 0
# Since the test interval is one second, the number of executions is the
# test duration in seconds
d['timer_max_executions'] = args.test_duration
def stop_tests(success):
d = globals()
d['test_done'] = True
if not success:
d['test_succeeded'] = False
# Stop if the required number of executions elapsed.
def maybe_stop_tests():
d = globals()
d['timer_executions'] = d['timer_executions'] + 1
if timer_executions >= timer_max_executions:
stop_tests(True)
def setup_database(cur):
cur.execute("SELECT extversion FROM pg_extension WHERE extname='pg_squeeze'")
if cur.rowcount == 0:
cur.execute("CREATE EXTENSION pg_squeeze")
cur.execute("CREATE SCHEMA IF NOT EXISTS %s" % test_schema)
cur.execute("CREATE SCHEMA IF NOT EXISTS %s" % verification_schema)
class CommandThread(Thread):
def __init__(self, cmds, cmds_executed):
super(CommandThread, self).__init__()
self.cmds = cmds
self.cmds_executed = cmds_executed
def run(self):
try:
con = get_connection()
# For debuging purposes we might need to print out XID, see below.
con.autocommit = False
cur = con.cursor()
while not test_done:
i = random.randint(0, len(self.cmds) - 1)
next_cmd = self.cmds[i]
cur.execute(next_cmd)
con.commit()
if self.cmds_executed != None:
# Record the command so it can be replayed during
# verification.
self.cmds_executed.append(i)
con.close()
except Exception as e:
print(e)
stop_tests(False)
class SqueezeParams(object):
def __init__(self, table, index):
self.table = table
self.index = index
class SqueezeThread(Thread):
# params_array is an array of SqueezeParams instances
#
# delay is the number of seconds to wait before the next squeeze should
# start.
def __init__(self, params_array, delay):
super(SqueezeThread, self).__init__()
self.params_array = params_array
self.delay = delay
self.done = False
def run(self):
self.con = get_connection()
self.con.autocommit = True
self.cur = self.con.cursor()
while not test_done:
timer = Timer(self.delay, self.squeeze)
timer.start()
timer.join()
self.con.close()
self.done = True
# Each call processes a random item of params_array.
def squeeze(self):
params = random.choice(self.params_array)
try:
ind = "'%s'" % params.index if params.index else "NULL"
self.cur.execute("SET maintenance_work_mem='1MB'")
self.cur.execute(
"SELECT squeeze.squeeze_table('%s', '%s', %s)" %
(test_schema, params.table, ind,))
self.cur.execute("SELECT count(*) FROM squeeze.errors")
row = self.cur.fetchone()
if row[0] > 0:
# XXX The failure could be caused by a concurrent call of
# squeeze_table(). Nevertheless, it's a reason to stop.
raise Exception("squeeze_table() failed")
except Exception as e:
print(e)
stop_tests(False)
# cmds_setup is a list of commands to create the test table and any other
# database objects needed. The first item must be CREATE TABLE command, and it
# must contain two formatting strings: one to allow insertion of UNLOGGED
# keyword and one to insert schema name.
#
# cmds is a list of commands to be executed in random order. The commands must
# be such that new execution in the same order on an empty table produces the
# same results as during the first execution. For example, only stable
# functions (in terms of pg_proc(provolatile) may be used.
#
# check_query is an SQL query that compares the data produced by the stability
# test to the data produced by replaying the same commands and returns zero if
# the sets are identical.
class TestTemplate(object):
def __init__(self, table, cmds_setup, cmds, check_query):
self.table = table
self.cmds_setup = cmds_setup
self.cmds = cmds
self.check_query = check_query
class Test(object):
def __init__(self, template, inst):
self.table = template.table.format(_sch_= test_schema,
_inst_=inst)
self.table_verify = template.table.format(_sch_= verification_schema,
_inst_=inst)
self.cmds_setup = list(c.format(_wal_="",
_sch_= test_schema,
_inst_=inst,
_table_=self.table)
for c in template.cmds_setup)
self.cmds = list(c.format(_sch_= test_schema, _inst_=inst, _table_=self.table)
for c in template.cmds)
self.cmds_setup_verify = list(c.format(_wal_="UNLOGGED",
_sch_= verification_schema,
_inst_=inst,
_table_=self.table_verify)
for c in template.cmds_setup)
self.cmds_verify = list(c.format(_sch_= verification_schema,
_inst_=inst,
_table_=self.table_verify)
for c in template.cmds)
self.check_query = template.check_query.format(_sch_= test_schema,
_sch_verify_=verification_schema,
_inst_=inst,
_table_=self.table,
_table_verify_=self.table_verify)
def setup(self):
con = get_connection()
con.autocommit = True
cur = con.cursor()
stmt = "DROP TABLE IF EXISTS %s" % (self.table,)
cur.execute(stmt)
for cmd in self.cmds_setup:
cur.execute(cmd)
con.close()
if args.no_verification:
self.cmds_executed = None
else:
self.cmds_executed = []
def start(self):
self.cmd_thread = CommandThread(self.cmds, self.cmds_executed)
self.cmd_thread.start()
# Execute the test suite once.
def run_tests(tests):
print('Running test...')
start_tests()
for test in tests:
test.setup()
for test in tests:
test.start()
squeeze_thread.start()
while True:
if test_done:
break
timer = Timer(1, maybe_stop_tests)
timer.start()
timer.join()
# Run the same SQL statements on each table again, w/o the interference with
# squeeze_table(). The resulting tables should be identical.
def verify_tests(tests, squeeze_thread, con, con_vac):
print("Verifying results...")
cur = con.cursor()
cur_vac = con_vac.cursor()
# Make sure that the last call of squeeze_table() finished.
while not squeeze_thread.done:
time.sleep(1)
cur.execute("CREATE SCHEMA IF NOT EXISTS %s" % verification_schema)
con.commit()
# Create tables to execute the queries again.
for test in tests:
cur.execute("DROP TABLE IF EXISTS %s" % (test.table_verify,))
for cmd in test.cmds_setup_verify:
cur.execute(cmd)
# We try to run VACUUM FULL when it seems appropriate (see below), so do
# not let autovacuum interfere with that effort.
cur.execute("ALTER TABLE %s SET (autovacuum_enabled=false)" %
test.table)
cur.execute("ALTER TABLE %s SET (toast.autovacuum_enabled=false)" %
test.table)
con.commit()
# Replay the commands. Use a separate transaction for each command and
# change search_path only within the transaction so that we do not have to
# remember the original value of search_path.
rows = 0.0
rows_live = 0.0
for i in test.cmds_executed:
cmd = test.cmds_verify[i]
cur.execute(cmd)
if cmd.lower().find('insert') >= 0:
rows = rows + cur.rowcount
rows_live = rows_live + cur.rowcount
elif cmd.lower().find('update') >= 0:
# UPDATE does not change the number of live rows, it just adds one
# dead row per row updated.
rows = rows + cur.rowcount
elif cmd.lower().find('delete') >= 0:
# DELETE removes a live row, but does not change the total number
# of rows.
rows_live = rows_live - cur.rowcount
con.commit()
# The check queries run much faster if the bloat is kept at reasonable
# level.
if rows > 0 and rows_live / rows < 0.5:
cur_vac.execute("VACUUM FULL %s" % test.table)
rows = rows_live
# Compare the tables
cur.execute(test.check_query)
con.commit()
if cur.rowcount == 0:
print('Test passed for table "%s"' % test.table)
cur.execute("DROP TABLE %s" % (test.table_verify,))
con.commit()
else:
print('Found difference for table "%s"' % test.table)
con.close()
con_vac.close()
# Multiple independently running tests can be created from each
# instance. {_inst_} in the command text stands for the instance number.
#
# {_wal_} is used to substitute the UNLOGGED option, which is useful during
# verification of the test results.
#
# {_sch_} is used to substitute the schema the table will reside in.
#
# Likewise, {_sch_verify_} is a placeholder for the schema where the
# verification of the test results is performed.
#
# {_table_} and {_table_verify_} are the test and verification tables with the
# placeholders substituted.
test_templates = [
TestTemplate(
table = "{_sch_}.a_{_inst_}",
cmds_setup = [
"CREATE {_wal_} TABLE {_table_}(i serial NOT NULL PRIMARY KEY, j int)",
],
cmds = [
"INSERT INTO {_table_}(j) SELECT g.i FROM generate_series(0, 255) AS g(i)",
"UPDATE {_table_} SET j = j + 1 WHERE i IN (SELECT i FROM {_table_} WHERE i % 2 = 0 ORDER BY i LIMIT 256)",
"UPDATE {_table_} SET j = j + 1 WHERE i IN (SELECT i FROM {_table_} WHERE i % 2 = 1 ORDER BY i LIMIT 256)",
"DELETE FROM {_table_} WHERE i IN (SELECT i FROM {_table_} WHERE i % 2 = 0 ORDER BY i LIMIT 128)",
"DELETE FROM {_table_} WHERE i IN (SELECT i FROM {_table_} WHERE i % 2 = 1 ORDER BY i LIMIT 128)"
],
check_query = "SELECT * FROM {_table_} AS t1 FULL JOIN {_table_verify_} AS t2 ON (t1.i, t2.j) = (t2.i, t2.j) WHERE t1.i ISNULL OR t2.i ISNULL")
,
# TOAST
TestTemplate(
table = "{_sch_}.b_{_inst_}",
cmds_setup = [
"CREATE {_wal_} TABLE {_table_}(i serial NOT NULL PRIMARY KEY, j text)",
"CREATE OR REPLACE FUNCTION {_sch_}.long_string() RETURNS text LANGUAGE sql AS $$ SELECT string_agg(h.x::text, ' ') FROM generate_series(0, 4095) as h(x);$$"
],
cmds = [
"INSERT INTO {_table_}(j) SELECT {_sch_}.long_string() FROM generate_series(0, 16) AS g(i)",
"UPDATE {_table_} SET j = {_sch_}.long_string() WHERE i IN (SELECT i FROM {_table_} WHERE i % 2 = 0 ORDER BY i LIMIT 256)",
"UPDATE {_table_} SET j = {_sch_}.long_string() WHERE i IN (SELECT i FROM {_table_} WHERE i % 2 = 1 ORDER BY i LIMIT 256)",
"DELETE FROM {_table_} WHERE i IN (SELECT i FROM {_table_} WHERE i % 2 = 0 ORDER BY i LIMIT 8)",
"DELETE FROM {_table_} WHERE i IN (SELECT i FROM {_table_} WHERE i % 2 = 1 ORDER BY i LIMIT 8)"
],
check_query = "SELECT * FROM {_table_} AS t1 FULL JOIN {_table_verify_} AS t2 ON (t1.i, t2.j) = (t2.i, t2.j) WHERE t1.i ISNULL OR t2.i ISNULL")
,
# Update identity key.
TestTemplate(
table = "{_sch_}.c_{_inst_}",
cmds_setup = [
"CREATE {_wal_} TABLE {_table_}(i serial NOT NULL PRIMARY KEY, j real)"
],
cmds = [
# The values should be sparse so that the UPDATE can increment as
# many values as possible.
"WITH tmp(i) AS (SELECT max(i) FROM (SELECT i FROM {_table_} UNION VALUES (0)) AS s) INSERT INTO {_table_} SELECT g.i FROM tmp, generate_series(tmp.i + 1, tmp.i + 256) AS g(i)",
# To avoid violation of the primary key, only update those rows
# for which i + 1 does not exist.
"UPDATE {_table_} SET i = i + 1 WHERE i IN (SELECT s1.x FROM (SELECT i, i + 1 FROM {_table_}) s1(x, y) LEFT JOIN (SELECT i FROM {_table_}) s2(x) ON s1.y = s2.x WHERE s2.x ISNULL)",
"DELETE FROM {_table_} WHERE i IN (SELECT i FROM {_table_} WHERE i % 2 = 0 ORDER BY i LIMIT 128)",
"DELETE FROM {_table_} WHERE i IN (SELECT i FROM {_table_} WHERE i % 2 = 1 ORDER BY i LIMIT 128)",
],
check_query = "SELECT * FROM {_table_} AS t1 FULL JOIN {_table_verify_} AS t2 ON t1.i = t2.i WHERE t1.i ISNULL OR t2.i ISNULL")
]
con = get_connection()
con.autocommit = True
cur = con.cursor()
try:
setup_database(cur)
check_ok = True
# Prepare for error checking.
cur.execute("TRUNCATE TABLE squeeze.errors")
# Make sure that the squeeze worker is on.
cur.execute("SELECT squeeze.start_worker()")
except Exception as e:
print(e)
check_ok = False
finally:
con.close()
if not check_ok:
sys.exit(1)
def add_squeeze_params(params, tabname_template):
for i in range(test_instances):
tabname = tabname_template.format(_inst_=i)
indname = tabname + "_pkey"
params.append(SqueezeParams(tabname, None)),
params.append(SqueezeParams(tabname, indname)),
for i in range(args.test_iterations):
con = get_connection()
con.autocommit = False
# An extra connection for VACUUM because it cannot run inside transaction
# block.
if not args.no_verification:
con_vac = get_connection()
con_vac.autocommit = True
params = []
add_squeeze_params(params, "a_{_inst_}")
add_squeeze_params(params, "b_{_inst_}")
add_squeeze_params(params, "c_{_inst_}")
# TODO Multiple squeeze threads? (If so, have setup_database() check the
# squeeze.workers_per_database GUC.)
squeeze_thread = SqueezeThread(params, 1.0)
tests = []
for i in range(test_instances):
for template in test_templates:
test = Test(template, i)
tests.append(test)
try:
run_tests(tests)
except KeyboardInterrupt as e:
# Pay special attention to KeyboardInterrupt because the join() method
# of Timer can be interrupted, in which case maybe_stop_tests never
# gets called.
print(e)
stop_tests(False)
con.close()
if not args.no_verification:
con_vac.close()
if test_succeeded:
if not args.no_verification:
try:
verify_tests(tests, squeeze_thread, con, con_vac)
except Exception as e:
print(e)
con.close()
con_vac.close()
sys.exit(1)
else:
con.close()
if not args.no_verification:
con_vac.close()
sys.exit(1)
con = get_connection()
cur = con.cursor()
cur.execute("SELECT squeeze.stop_worker()")
|