1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260
|
create or replace function londiste.create_trigger(
in i_queue_name text,
in i_table_name text,
in i_trg_args text[],
in i_dest_table text,
in i_node_type text,
out ret_code int4,
out ret_note text,
out trigger_name text)
as $$
------------------------------------------------------------------------
-- Function: londiste.create_trigger(5)
--
-- Create or replace londiste trigger(s)
--
-- Parameters:
-- i_queue_name - queue name
-- i_table_name - table name
-- i_trg_args - args to trigger
-- i_dest_table - actual name of destination table (NULL if same as src)
-- i_node_type - l3 node type
--
-- Trigger args:
-- See documentation for pgq triggers.
--
-- Trigger creation flags (default: AIUDL):
-- I - ON INSERT
-- U - ON UPDATE
-- D - ON DELETE
-- Q - use pgq.sqltriga() as trigger function
-- L - use pgq.logutriga() as trigger function
-- J - use pgq.jsontriga() as trigger function
-- B - BEFORE
-- A - AFTER
-- S - SKIP
--
-- Returns:
-- 200 - Ok
-- 201 - Trigger not created
-- 405 - Multiple SKIP triggers
--
------------------------------------------------------------------------
declare
trigger_name text;
lg_func text;
lg_pos text;
lg_event text;
lg_args text[];
_old_tgargs bytea;
_new_tgargs bytea;
trunctrg_name text;
pgversion int;
sql text;
arg text;
i integer;
_extra_args text[] := '{}';
-- skip trigger
_skip_prefix text := 'zzz_';
_skip_trg_count integer;
_skip_trg_name text;
-- given tgflags array
_tgflags char[];
-- ordinary argument array
_args text[];
-- array with all valid tgflags values
_valid_flags char[] := array['B','A','Q','L','J','I','U','D','S'];
-- argument flags
_skip boolean := false;
_no_triggers boolean := false;
_got_extra1 boolean := false;
begin
-- parse trigger args
if array_lower(i_trg_args, 1) is not null then
for i in array_lower(i_trg_args, 1) .. array_upper(i_trg_args, 1) loop
arg := i_trg_args[i];
if arg like 'tgflags=%' then
-- special flag handling
arg := upper(substr(arg, 9));
for j in array_lower(_valid_flags, 1) .. array_upper(_valid_flags, 1) loop
if position(_valid_flags[j] in arg) > 0 then
_tgflags := array_append(_tgflags, _valid_flags[j]);
end if;
end loop;
elsif arg = 'no_triggers' then
_no_triggers := true;
elsif lower(arg) = 'skip' then
_skip := true;
elsif arg = 'virtual_table' then
_no_triggers := true; -- do not create triggers
elsif arg not in ('expect_sync', 'skip_truncate', 'merge_all', 'no_merge') then -- ignore add-table args
if arg like 'ev_extra1=%' then
_got_extra1 := true;
end if;
-- ordinary arg
_args = array_append(_args, quote_literal(arg));
end if;
end loop;
end if;
if _no_triggers then
select 201, 'Trigger not created'
into ret_code, ret_note;
return;
end if;
if i_dest_table <> i_table_name and not _got_extra1 then
-- if renamed table, enforce trigger to put
-- global table name into extra1
arg := 'ev_extra1=' || quote_literal(i_table_name);
_args := array_append(_args, quote_literal(arg));
end if;
trigger_name := '_londiste_' || i_queue_name;
lg_func := 'pgq.logutriga';
lg_event := '';
lg_args := array[quote_literal(i_queue_name)];
lg_pos := 'after';
if array_lower(_args, 1) is not null then
lg_args := lg_args || _args;
end if;
if 'B' = any(_tgflags) then
lg_pos := 'before';
end if;
if 'A' = any(_tgflags) then
lg_pos := 'after';
end if;
if 'Q' = any(_tgflags) then
lg_func := 'pgq.sqltriga';
end if;
if 'L' = any(_tgflags) then
lg_func := 'pgq.logutriga';
end if;
if 'J' = any(_tgflags) then
lg_func := 'pgq.jsontriga';
end if;
if 'I' = any(_tgflags) then
lg_event := lg_event || ' or insert';
end if;
if 'U' = any(_tgflags) then
lg_event := lg_event || ' or update';
end if;
if 'D' = any(_tgflags) then
lg_event := lg_event || ' or delete';
end if;
if 'S' = any(_tgflags) then
_skip := true;
end if;
if i_node_type = 'leaf' then
-- on weird leafs the trigger funcs may not exist
perform 1 from pg_proc p join pg_namespace n on (n.oid = p.pronamespace)
where n.nspname = 'pgq' and p.proname in ('logutriga', 'sqltriga', 'jsontriga');
if not found then
select 201, 'Trigger not created' into ret_code, ret_note;
return;
end if;
-- on regular leaf, install deny trigger
_extra_args := array_append(_extra_args, quote_literal('deny'));
end if;
if _skip or lg_pos = 'after' then
-- get count and name of existing skip triggers
select count(*), min(t.tgname)
into _skip_trg_count, _skip_trg_name
from pg_catalog.pg_trigger t
where t.tgrelid = londiste.find_table_oid(i_dest_table)
and position(E'\\000SKIP\\000'::bytea in tgargs) > 0;
end if;
-- make sure new trigger won't be effectively inactive
if lg_pos = 'after' and _skip_trg_count > 0 then
select 403, 'AFTER trigger cannot work with SKIP trigger(s)'
into ret_code, ret_note;
return;
end if;
-- if skip param given, rename previous skip triggers and prefix current
if _skip then
-- if no previous skip triggers, prefix name and add SKIP to args
if _skip_trg_count = 0 then
trigger_name := _skip_prefix || trigger_name;
lg_args := array_append(lg_args, quote_literal('SKIP'));
-- if one previous skip trigger, check it's prefix and
-- do not use SKIP on current trigger
elsif _skip_trg_count = 1 then
-- if not prefixed then rename
if position(_skip_prefix in _skip_trg_name) != 1 then
sql := 'alter trigger ' || _skip_trg_name
|| ' on ' || londiste.quote_fqname(i_dest_table)
|| ' rename to ' || _skip_prefix || _skip_trg_name;
execute sql;
end if;
else
select 405, 'Multiple SKIP triggers'
into ret_code, ret_note;
return;
end if;
end if;
-- create Ins/Upd/Del trigger if it does not exists already
select t.tgargs
from pg_catalog.pg_trigger t
where t.tgrelid = londiste.find_table_oid(i_dest_table)
and t.tgname = trigger_name
into _old_tgargs;
if found then
_new_tgargs := decode(lg_args[1], 'escape');
for i in 2 .. array_upper(lg_args, 1) loop
_new_tgargs := _new_tgargs || E'\\000'::bytea || decode(lg_args[i], 'escape');
end loop;
if _old_tgargs is distinct from _new_tgargs then
sql := 'drop trigger if exists ' || quote_ident(trigger_name)
|| ' on ' || londiste.quote_fqname(i_dest_table);
execute sql;
end if;
end if;
if not found or _old_tgargs is distinct from _new_tgargs then
-- finalize event
lg_event := substr(lg_event, 4); -- remove ' or '
if lg_event = '' then
lg_event := 'insert or update or delete';
end if;
-- create trigger
lg_args := lg_args || _extra_args;
sql := 'create trigger ' || quote_ident(trigger_name)
|| ' ' || lg_pos || ' ' || lg_event
|| ' on ' || londiste.quote_fqname(i_dest_table)
|| ' for each row execute procedure '
|| lg_func || '(' || array_to_string(lg_args, ', ') || ')';
execute sql;
end if;
-- create truncate trigger if it does not exists already
show server_version_num into pgversion;
if pgversion >= 80400 then
trunctrg_name := '_londiste_' || i_queue_name || '_truncate';
perform 1 from pg_catalog.pg_trigger
where tgrelid = londiste.find_table_oid(i_dest_table)
and tgname = trunctrg_name;
if not found then
_extra_args := quote_literal(i_queue_name) || _extra_args;
sql := 'create trigger ' || quote_ident(trunctrg_name)
|| ' after truncate on ' || londiste.quote_fqname(i_dest_table)
|| ' for each statement execute procedure pgq.sqltriga('
|| array_to_string(_extra_args, ', ') || ')';
execute sql;
end if;
end if;
select 200, 'OK'
into ret_code, ret_note;
return;
end;
$$ language plpgsql;
|