1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
SELECT * FROM pglogical_regress_variables()
\gset
\c :provider_dsn
SELECT pglogical.replicate_ddl_command($$
CREATE TABLE public.test_trg_data(id serial primary key, data text);
$$);
SELECT * FROM pglogical.replication_set_add_table('default', 'test_trg_data');
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
\c :subscriber_dsn
CREATE TABLE test_trg_hist(table_name text, action text, action_id serial, original_data text, new_data text);
CREATE FUNCTION test_trg_data_hist_fn() RETURNS TRIGGER AS $$
BEGIN
IF (TG_OP = 'UPDATE') THEN
INSERT INTO test_trg_hist (table_name,action,original_data,new_data)
VALUES (TG_TABLE_NAME::TEXT, substring(TG_OP,1,1), ROW(OLD.*), ROW(NEW.*));
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
INSERT INTO test_trg_hist (table_name,action,original_data)
VALUES (TG_TABLE_NAME::TEXT, substring(TG_OP,1,1), ROW(OLD.*));
RETURN OLD;
ELSIF (TG_OP = 'INSERT') THEN
INSERT INTO test_trg_hist (table_name,action,new_data)
VALUES (TG_TABLE_NAME::TEXT, substring(TG_OP,1,1), ROW(NEW.*));
RETURN NEW;
ELSE
RAISE WARNING 'Unknown action';
RETURN NULL;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER test_trg_data_hist_trg
AFTER INSERT OR UPDATE OR DELETE ON test_trg_data
FOR EACH ROW EXECUTE PROCEDURE test_trg_data_hist_fn();
\c :provider_dsn
INSERT INTO test_trg_data(data) VALUES ('no_history');
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
\c :subscriber_dsn
SELECT * FROM test_trg_data;
SELECT * FROM test_trg_hist;
ALTER TABLE test_trg_data ENABLE REPLICA TRIGGER test_trg_data_hist_trg;
\c :provider_dsn
INSERT INTO test_trg_data(data) VALUES ('yes_history');
UPDATE test_trg_data SET data = 'yes_history';
DELETE FROM test_trg_data;
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
\c :subscriber_dsn
SELECT * FROM test_trg_data;
SELECT * FROM test_trg_hist;
DROP TABLE test_trg_hist CASCADE;
\c :provider_dsn
SELECT pglogical.replicate_ddl_command($$
CREATE TABLE public.basic_dml (
id serial primary key,
other integer,
data text,
something interval
);
$$);
SELECT * FROM pglogical.replication_set_add_table('default', 'basic_dml');
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
\c :subscriber_dsn
-- create row filter trigger
CREATE FUNCTION filter_basic_dml_fn() RETURNS TRIGGER AS $$
BEGIN
IF (TG_OP in ('UPDATE', 'INSERT')) THEN
-- treating 'DELETE' as pass-through
IF (NEW.id > 1 AND NEW.data IS DISTINCT FROM 'baz' AND NEW.data IS DISTINCT FROM 'bbb') THEN
RETURN NEW;
ELSE
RETURN NULL;
END IF;
ELSE
RAISE WARNING 'Unknown action';
RETURN NULL;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER filter_basic_dml_trg
BEFORE INSERT OR UPDATE ON basic_dml
FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn();
\c :provider_dsn
-- insert into table at provider
\COPY basic_dml FROM STDIN WITH CSV
5000,1,aaa,1 hour
5001,2,bbb,2 years
5002,3,ccc,3 minutes
5003,4,ddd,4 days
\.
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
\c :subscriber_dsn
-- rows received at suscriber as trigger is not enabled yet.
SELECT * from basic_dml ORDER BY id;
-- Now enable trigger:
ALTER TABLE basic_dml ENABLE REPLICA TRIGGER filter_basic_dml_trg;
\c :provider_dsn
TRUNCATE basic_dml;
-- check basic insert replication
INSERT INTO basic_dml(other, data, something)
VALUES (5, 'foo', '1 minute'::interval),
(4, 'bar', '12 weeks'::interval),
(3, 'baz', '2 years 1 hour'::interval),
(2, 'qux', '8 months 2 days'::interval),
(1, NULL, NULL);
SELECT pglogical.wait_slot_confirm_lsn(NULL, NULL);
\c :subscriber_dsn
-- rows filtered at suscriber as trigger is enabled.
SELECT * from basic_dml ORDER BY id;
\set VERBOSITY terse
DROP FUNCTION test_trg_data_hist_fn() CASCADE;
DROP FUNCTION filter_basic_dml_fn() CASCADE;
\c :provider_dsn
\set VERBOSITY terse
SELECT pglogical.replicate_ddl_command($$
DROP TABLE public.test_trg_data CASCADE;
DROP TABLE public.basic_dml CASCADE;
$$);
|