1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
-- row based filtering
SELECT * FROM pglogical_regress_variables()
\gset
\c :provider_dsn
-- testing volatile sampling function in row_filter
SELECT pglogical.replicate_ddl_command($$
CREATE TABLE public.test_tablesample (id int primary key, name text) WITH (fillfactor=10);
$$);
replicate_ddl_command
-----------------------
t
(1 row)
-- use fillfactor so we don't have to load too much data to get multiple pages
INSERT INTO test_tablesample
SELECT i, repeat(i::text, 200) FROM generate_series(0, 9) s(i);
create or replace function funcn_get_system_sample_count(integer, integer) returns bigint as
$$ (SELECT count(*) FROM test_tablesample TABLESAMPLE SYSTEM ($1) REPEATABLE ($2)); $$
language sql volatile;
create or replace function funcn_get_bernoulli_sample_count(integer, integer) returns bigint as
$$ (SELECT count(*) FROM test_tablesample TABLESAMPLE BERNOULLI ($1) REPEATABLE ($2)); $$
language sql volatile;
SELECT * FROM pglogical.replication_set_add_table('default', 'test_tablesample', false, row_filter := $rf$id > funcn_get_system_sample_count(100, 3) $rf$);
replication_set_add_table
---------------------------
t
(1 row)
SELECT * FROM pglogical.replication_set_remove_table('default', 'test_tablesample');
replication_set_remove_table
------------------------------
t
(1 row)
SELECT * FROM pglogical.replication_set_add_table('default', 'test_tablesample', true, row_filter := $rf$id > funcn_get_bernoulli_sample_count(10, 0) $rf$);
replication_set_add_table
---------------------------
t
(1 row)
SELECT * FROM test_tablesample ORDER BY id limit 5;
id | name
----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
0 | 00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
1 | 11111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111
2 | 22222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222
3 | 33333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333
4 | 44444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444
(5 rows)
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
wait_slot_confirm_lsn
-----------------------
(1 row)
\c :subscriber_dsn
BEGIN;
SET LOCAL statement_timeout = '180s';
SELECT pglogical.wait_for_table_sync_complete('test_subscription', 'test_tablesample');
wait_for_table_sync_complete
------------------------------
(1 row)
COMMIT;
SELECT sync_kind, sync_nspname, sync_relname, sync_status FROM pglogical.local_sync_status WHERE sync_relname = 'test_tablesample';
sync_kind | sync_nspname | sync_relname | sync_status
-----------+--------------+------------------+-------------
d | public | test_tablesample | r
(1 row)
SELECT * FROM test_tablesample ORDER BY id limit 5;
id | name
----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
3 | 33333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333333
4 | 44444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444444
5 | 55555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555555
6 | 66666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666666
7 | 77777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777777
(5 rows)
\c :provider_dsn
\set VERBOSITY terse
DROP FUNCTION funcn_get_system_sample_count(integer, integer);
DROP FUNCTION funcn_get_bernoulli_sample_count(integer, integer);
SELECT pglogical.replicate_ddl_command($$
DROP TABLE public.test_tablesample CASCADE;
$$);
NOTICE: drop cascades to table public.test_tablesample membership in replication set default
replicate_ddl_command
-----------------------
t
(1 row)
CREATE TABLE sample_rowfilter_table(id int primary key, name text);
\COPY sample_rowfilter_table(id, name) FROM STDIN WITH CSV
SELECT pglogical.create_replication_set('sample_publisher_set', true, true, true, true);
create_replication_set
------------------------
1195464429
(1 row)
SELECT pglogical.replication_set_add_table(set_name := 'sample_publisher_set', relation := 'public.sample_rowfilter_table', row_filter := $$ id >= 1 and id <= 3 $$);
replication_set_add_table
---------------------------
t
(1 row)
SELECT * FROM pglogical.table_data_filtered(NULL::"public"."sample_rowfilter_table", '"public"."sample_rowfilter_table"'::regclass, ARRAY['sample_publisher_set']);
id | name
----+------
1 | John
2 | Jane
3 | Bob
(3 rows)
-- Try to trigger cache invalidation for the sample_rowfilter_table relation
-- while program execution is inside create_estate_for_relation(), called from
-- pglogical_table_data_filtered(). To reach that reliably, one can run the
-- test suite under debug_discard_caches=1. This helps verify row_filter is
-- applied correctly even during cache invalidation.
SELECT pglogical.replication_set_remove_table('sample_publisher_set', '"public"."sample_rowfilter_table"'::regclass);
replication_set_remove_table
------------------------------
t
(1 row)
SELECT pglogical.replication_set_add_table(set_name := 'sample_publisher_set', relation := 'public.sample_rowfilter_table', row_filter := $$ id >= 4 and id <= 6 $$);
replication_set_add_table
---------------------------
t
(1 row)
SELECT * FROM pglogical.table_data_filtered(NULL::"public"."sample_rowfilter_table", '"public"."sample_rowfilter_table"'::regclass, ARRAY['sample_publisher_set']);
id | name
----+---------
4 | Alice
5 | Charlie
6 | Eve
(3 rows)
SELECT pglogical.drop_replication_set('sample_publisher_set');
drop_replication_set
----------------------
t
(1 row)
DROP TABLE sample_rowfilter_table;
|