1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
-- predictability
SET synchronous_commit = on;
-- fail because we're creating a slot while in an xact with xid
BEGIN;
SELECT pg_current_xact_id() = '0';
?column?
----------
f
(1 row)
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
ERROR: cannot create logical replication slot in transaction that has performed writes
ROLLBACK;
-- fail because we're creating a slot while in a subxact whose topxact has an xid
BEGIN;
SELECT pg_current_xact_id() = '0';
?column?
----------
f
(1 row)
SAVEPOINT barf;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
ERROR: cannot create logical replication slot in transaction that has performed writes
ROLLBACK TO SAVEPOINT barf;
ROLLBACK;
-- succeed, outside tx.
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
?column?
----------
init
(1 row)
SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
stop
(1 row)
-- succeed, in tx without xid.
BEGIN;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
?column?
----------
init
(1 row)
COMMIT;
CREATE TABLE nobarf(id serial primary key, data text);
INSERT INTO nobarf(data) VALUES('1');
-- decoding works in transaction with xid
BEGIN;
SELECT pg_current_xact_id() = '0';
?column?
----------
f
(1 row)
-- don't show yet, haven't committed
INSERT INTO nobarf(data) VALUES('2');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
-----------------------------------------------------------
BEGIN
table public.nobarf: INSERT: id[integer]:1 data[text]:'1'
COMMIT
(3 rows)
COMMIT;
INSERT INTO nobarf(data) VALUES('3');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
-----------------------------------------------------------
BEGIN
table public.nobarf: INSERT: id[integer]:2 data[text]:'2'
COMMIT
BEGIN
table public.nobarf: INSERT: id[integer]:3 data[text]:'3'
COMMIT
(6 rows)
-- Decoding works in transaction that issues DDL
--
-- We had issues handling relcache invalidations with these, see
-- https://www.postgresql.org/message-id/e56be7d9-14b1-664d-0bfc-00ce9772721c@gmail.com
CREATE TABLE tbl_created_outside_xact(id SERIAL PRIMARY KEY);
BEGIN;
-- TRUNCATE changes the relfilenode and sends relcache invalidation
TRUNCATE tbl_created_outside_xact;
INSERT INTO tbl_created_outside_xact(id) VALUES('1');
-- don't show yet, haven't committed
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
--------------------------------------------------------------
BEGIN
table public.tbl_created_outside_xact: TRUNCATE: (no-flags)
table public.tbl_created_outside_xact: INSERT: id[integer]:1
COMMIT
(4 rows)
SET debug_logical_replication_streaming = immediate;
BEGIN;
CREATE TABLE tbl_created_in_xact(id SERIAL PRIMARY KEY);
INSERT INTO tbl_created_in_xact VALUES (1);
CHECKPOINT; -- Force WAL flush, so that the above changes will be streamed
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
------------------------------------------
opening a streamed block for transaction
streaming change for transaction
closing a streamed block for transaction
(3 rows)
COMMIT;
RESET debug_logical_replication_streaming;
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
---------------------------------------------------------
BEGIN
table public.tbl_created_in_xact: INSERT: id[integer]:1
COMMIT
(3 rows)
SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
stop
(1 row)
|