1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
|
SELECT * FROM pglogical_regress_variables()
\gset
\c :provider_dsn
SELECT pglogical.replicate_ddl_command($$
CREATE TABLE public.test_trg_data(id serial primary key, data text);
$$);
replicate_ddl_command
-----------------------
t
(1 row)
SELECT * FROM pglogical.replication_set_add_table('default', 'test_trg_data');
replication_set_add_table
---------------------------
t
(1 row)
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
wait_slot_confirm_lsn
-----------------------
(1 row)
\c :subscriber_dsn
CREATE TABLE test_trg_hist(table_name text, action text, action_id serial, original_data text, new_data text);
CREATE FUNCTION test_trg_data_hist_fn() RETURNS TRIGGER AS $$
BEGIN
IF (TG_OP = 'UPDATE') THEN
INSERT INTO test_trg_hist (table_name,action,original_data,new_data)
VALUES (TG_TABLE_NAME::TEXT, substring(TG_OP,1,1), ROW(OLD.*), ROW(NEW.*));
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
INSERT INTO test_trg_hist (table_name,action,original_data)
VALUES (TG_TABLE_NAME::TEXT, substring(TG_OP,1,1), ROW(OLD.*));
RETURN OLD;
ELSIF (TG_OP = 'INSERT') THEN
INSERT INTO test_trg_hist (table_name,action,new_data)
VALUES (TG_TABLE_NAME::TEXT, substring(TG_OP,1,1), ROW(NEW.*));
RETURN NEW;
ELSE
RAISE WARNING 'Unknown action';
RETURN NULL;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER test_trg_data_hist_trg
AFTER INSERT OR UPDATE OR DELETE ON test_trg_data
FOR EACH ROW EXECUTE PROCEDURE test_trg_data_hist_fn();
\c :provider_dsn
INSERT INTO test_trg_data(data) VALUES ('no_history');
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
wait_slot_confirm_lsn
-----------------------
(1 row)
\c :subscriber_dsn
SELECT * FROM test_trg_data;
id | data
----+------------
1 | no_history
(1 row)
SELECT * FROM test_trg_hist;
table_name | action | action_id | original_data | new_data
------------+--------+-----------+---------------+----------
(0 rows)
ALTER TABLE test_trg_data ENABLE REPLICA TRIGGER test_trg_data_hist_trg;
\c :provider_dsn
INSERT INTO test_trg_data(data) VALUES ('yes_history');
UPDATE test_trg_data SET data = 'yes_history';
DELETE FROM test_trg_data;
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
wait_slot_confirm_lsn
-----------------------
(1 row)
\c :subscriber_dsn
SELECT * FROM test_trg_data;
id | data
----+------
(0 rows)
SELECT * FROM test_trg_hist;
table_name | action | action_id | original_data | new_data
---------------+--------+-----------+-----------------+-----------------
test_trg_data | I | 1 | | (2,yes_history)
test_trg_data | U | 2 | (1,no_history) | (1,yes_history)
test_trg_data | U | 3 | (2,yes_history) | (2,yes_history)
test_trg_data | D | 4 | (1,yes_history) |
test_trg_data | D | 5 | (2,yes_history) |
(5 rows)
DROP TABLE test_trg_hist CASCADE;
\c :provider_dsn
SELECT pglogical.replicate_ddl_command($$
CREATE TABLE public.basic_dml (
id serial primary key,
other integer,
data text,
something interval
);
$$);
replicate_ddl_command
-----------------------
t
(1 row)
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml');
replication_set_add_table
---------------------------
t
(1 row)
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
wait_slot_confirm_lsn
-----------------------
(1 row)
\c :subscriber_dsn
-- create row filter trigger
CREATE FUNCTION filter_basic_dml_fn() RETURNS TRIGGER AS $$
BEGIN
IF (TG_OP in ('UPDATE', 'INSERT')) THEN
-- treating 'DELETE' as pass-through
IF (NEW.id > 1 AND NEW.data IS DISTINCT FROM 'baz' AND NEW.data IS DISTINCT FROM 'bbb') THEN
RETURN NEW;
ELSE
RETURN NULL;
END IF;
ELSE
RAISE WARNING 'Unknown action';
RETURN NULL;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER filter_basic_dml_trg
BEFORE INSERT OR UPDATE ON basic_dml
FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn();
\c :provider_dsn
-- insert into table at provider
\COPY basic_dml FROM STDIN WITH CSV
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
wait_slot_confirm_lsn
-----------------------
(1 row)
\c :subscriber_dsn
-- rows received at suscriber as trigger is not enabled yet.
SELECT * from basic_dml ORDER BY id;
id | other | data | something
------+-------+------+-----------
5000 | 1 | aaa | @ 1 hour
5001 | 2 | bbb | @ 2 years
5002 | 3 | ccc | @ 3 mins
5003 | 4 | ddd | @ 4 days
(4 rows)
-- Now enable trigger:
ALTER TABLE basic_dml ENABLE REPLICA TRIGGER filter_basic_dml_trg;
\c :provider_dsn
TRUNCATE basic_dml;
-- check basic insert replication
INSERT INTO basic_dml(other, data, something)
VALUES (5, 'foo', '1 minute'::interval),
(4, 'bar', '12 weeks'::interval),
(3, 'baz', '2 years 1 hour'::interval),
(2, 'qux', '8 months 2 days'::interval),
(1, NULL, NULL);
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
wait_slot_confirm_lsn
-----------------------
(1 row)
\c :subscriber_dsn
-- rows filtered at suscriber as trigger is enabled.
SELECT * from basic_dml ORDER BY id;
id | other | data | something
----+-------+------+-----------------
2 | 4 | bar | @ 84 days
4 | 2 | qux | @ 8 mons 2 days
5 | 1 | |
(3 rows)
\set VERBOSITY terse
DROP FUNCTION test_trg_data_hist_fn() CASCADE;
NOTICE: drop cascades to trigger test_trg_data_hist_trg on table test_trg_data
DROP FUNCTION filter_basic_dml_fn() CASCADE;
NOTICE: drop cascades to trigger filter_basic_dml_trg on table basic_dml
\c :provider_dsn
\set VERBOSITY terse
SELECT pglogical.replicate_ddl_command($$
DROP TABLE public.test_trg_data CASCADE;
DROP TABLE public.basic_dml CASCADE;
$$);
NOTICE: drop cascades to table public.test_trg_data membership in replication set default
NOTICE: drop cascades to table public.basic_dml membership in replication set default
replicate_ddl_command
-----------------------
t
(1 row)
|