1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
|
#! /bin/bash
set -x
set -e
# This script expects the following environment variables to be set:
#
# - PGCOPYDB_SOURCE_PGURI
# - PGCOPYDB_TARGET_PGURI
# - PGCOPYDB_TABLE_JOBS
# - PGCOPYDB_INDEX_JOBS
# make sure source and target databases are ready
pgcopydb ping
psql -o /tmp/s.out -d ${PGCOPYDB_SOURCE_PGURI} -1 -f /usr/src/pagila/pagila-schema.sql
psql -o /tmp/d.out -d ${PGCOPYDB_SOURCE_PGURI} -1 -f /usr/src/pagila/pagila-data.sql
# alter the pagila schema to allow capturing DDLs without pkey
psql -d ${PGCOPYDB_SOURCE_PGURI} -f /usr/src/pgcopydb/ddl.sql
# create the replication slot that captures all the changes
# PGCOPYDB_OUTPUT_PLUGIN is set to test_decoding in docker-compose.yml
coproc ( pgcopydb snapshot --follow )
sleep 1
# now setup the replication origin (target) and the pgcopydb.sentinel (source)
pgcopydb stream setup
# pgcopydb clone uses the environment variables
pgcopydb clone
kill -TERM ${COPROC_PID}
wait ${COPROC_PID}
# now that the copying is done, inject some SQL DML changes to the source
psql -d ${PGCOPYDB_SOURCE_PGURI} -f /usr/src/pgcopydb/dml.sql
# grab the current LSN, it's going to be our streaming end position
lsn=`psql -At -d ${PGCOPYDB_SOURCE_PGURI} -c 'select pg_current_wal_lsn()'`
# and prefetch the changes captured in our replication slot
pgcopydb stream prefetch --resume --endpos "${lsn}" --debug
SHAREDIR=/var/lib/postgres/.local/share/pgcopydb
WALFILE=000000010000000000000002.json
SQLFILE=000000010000000000000002.sql
# now compare JSON output, skipping the lsn and nextlsn fields which are
# different at each run
expected=/tmp/expected.json
result=/tmp/result.json
JQSCRIPT='del(.lsn) | del(.nextlsn) | del(.timestamp) | del(.xid)'
jq "${JQSCRIPT}" /usr/src/pgcopydb/${WALFILE} > ${expected}
jq "${JQSCRIPT}" ${SHAREDIR}/${WALFILE} > ${result}
# first command to provide debug information, second to stop when returns non-zero
diff -I 'last_update' ${expected} ${result} || (cat ${SHAREDIR}/${WALFILE} && exit 1)
# now prefetch the changes again, which should be a noop
pgcopydb stream prefetch --resume --endpos "${lsn}" --notice
# now transform the JSON file into SQL
SQLFILENAME=`basename ${WALFILE} .json`.sql
pgcopydb stream transform --debug ${SHAREDIR}/${WALFILE} /tmp/${SQLFILENAME}
# we should get the same result as `pgcopydb stream prefetch`
diff ${SHAREDIR}/${SQLFILE} /tmp/${SQLFILENAME}
# we should also get the same result as expected (discarding LSN numbers)
# and also discarding ON UPDATE triggers for the timestamps (EXECUTE/last_update)
DIFFOPTS='-I BEGIN -I COMMIT -I KEEPALIVE -I SWITCH -I ENDPOS -I EXECUTE'
diff ${DIFFOPTS} /usr/src/pgcopydb/${SQLFILE} ${SHAREDIR}/${SQLFILENAME} || (cat ${SHAREDIR}/${SQLFILENAME} && exit 1)
# now allow for replaying/catching-up changes
pgcopydb stream sentinel set apply
# now apply the SQL file to the target database
pgcopydb stream catchup --resume --endpos "${lsn}" -vv
# now apply AGAIN the SQL file to the target database, skipping transactions
pgcopydb stream catchup --resume --endpos "${lsn}" -vv
# test whether transform propertly sets xid for continued transactions.
pgcopydb stream transform --debug /usr/src/pgcopydb/continued-txn.json /tmp/continued-txn.sql
diff /usr/src/pgcopydb/continued-txn.sql /tmp/continued-txn.sql || (cat /tmp/continued-txn.sql && exit 1)
# cleanup
pgcopydb stream cleanup
|