1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
|
-- predictability
SET synchronous_commit = on;
SELECT 'init' FROM
pg_create_logical_replication_slot('regression_slot_stats1', 'test_decoding') s1,
pg_create_logical_replication_slot('regression_slot_stats2', 'test_decoding') s2,
pg_create_logical_replication_slot('regression_slot_stats3', 'test_decoding') s3;
?column?
----------
init
(1 row)
CREATE TABLE stats_test(data text);
-- non-spilled xact
SET logical_decoding_work_mem to '64MB';
INSERT INTO stats_test values(1);
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL, NULL, 'skip-empty-xacts', '1');
count
-------
3
(1 row)
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1');
count
-------
3
(1 row)
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1');
count
-------
3
(1 row)
SELECT pg_stat_force_next_flush();
pg_stat_force_next_flush
--------------------------
(1 row)
SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
slot_name | spill_txns | spill_count | total_txns | total_bytes
------------------------+------------+-------------+------------+-------------
regression_slot_stats1 | t | t | t | t
regression_slot_stats2 | t | t | t | t
regression_slot_stats3 | t | t | t | t
(3 rows)
RESET logical_decoding_work_mem;
-- reset stats for one slot, others should be unaffected
SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
pg_stat_reset_replication_slot
--------------------------------
(1 row)
SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
slot_name | spill_txns | spill_count | total_txns | total_bytes
------------------------+------------+-------------+------------+-------------
regression_slot_stats1 | t | t | f | f
regression_slot_stats2 | t | t | t | t
regression_slot_stats3 | t | t | t | t
(3 rows)
-- reset stats for all slots
SELECT pg_stat_reset_replication_slot(NULL);
pg_stat_reset_replication_slot
--------------------------------
(1 row)
SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
slot_name | spill_txns | spill_count | total_txns | total_bytes
------------------------+------------+-------------+------------+-------------
regression_slot_stats1 | t | t | f | f
regression_slot_stats2 | t | t | f | f
regression_slot_stats3 | t | t | f | f
(3 rows)
-- verify accessing/resetting stats for non-existent slot does something reasonable
SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset
--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-------------
do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
(1 row)
SELECT pg_stat_reset_replication_slot('do-not-exist');
ERROR: replication slot "do-not-exist" does not exist
SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset
--------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-------------
do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
(1 row)
-- spilling the xact
BEGIN;
INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
COMMIT;
SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot_stats1', NULL, NULL, 'skip-empty-xacts', '1');
count
-------
5002
(1 row)
-- Check stats. We can't test the exact stats count as that can vary if any
-- background transaction (say by autovacuum) happens in parallel to the main
-- transaction.
SELECT pg_stat_force_next_flush();
pg_stat_force_next_flush
--------------------------
(1 row)
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
slot_name | spill_txns | spill_count
------------------------+------------+-------------
regression_slot_stats1 | t | t
regression_slot_stats2 | f | f
regression_slot_stats3 | f | f
(3 rows)
-- Ensure stats can be repeatedly accessed using the same stats snapshot. See
-- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
BEGIN;
SELECT slot_name FROM pg_stat_replication_slots;
slot_name
------------------------
regression_slot_stats1
regression_slot_stats2
regression_slot_stats3
(3 rows)
SELECT slot_name FROM pg_stat_replication_slots;
slot_name
------------------------
regression_slot_stats1
regression_slot_stats2
regression_slot_stats3
(3 rows)
COMMIT;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_stats4_twophase', 'test_decoding', false, true) s4;
?column?
----------
init
(1 row)
-- The INSERT changes are large enough to be spilled but will not be, because
-- the transaction is aborted. The logical decoding skips collecting further
-- changes too. The transaction is prepared to make sure the decoding processes
-- the aborted transaction.
BEGIN;
INSERT INTO stats_test SELECT 'serialize-toobig--1:'||g.i FROM generate_series(1, 5000) g(i);
PREPARE TRANSACTION 'test1_abort';
ROLLBACK PREPARED 'test1_abort';
SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats4_twophase', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
count
-------
1
(1 row)
-- Verify that the decoding doesn't spill already-aborted transaction's changes.
SELECT pg_stat_force_next_flush();
pg_stat_force_next_flush
--------------------------
(1 row)
SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase';
slot_name | spill_txns | spill_count
---------------------------------+------------+-------------
regression_slot_stats4_twophase | 0 | 0
(1 row)
DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot_stats1'),
pg_drop_replication_slot('regression_slot_stats2'),
pg_drop_replication_slot('regression_slot_stats3'),
pg_drop_replication_slot('regression_slot_stats4_twophase');
pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot | pg_drop_replication_slot
--------------------------+--------------------------+--------------------------+--------------------------
| | |
(1 row)
|