1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
|
create or replace function pgq.sqltriga() returns trigger as $$
-- ----------------------------------------------------------------------
-- Function: pgq.logutriga()
--
-- Trigger function that puts row data in SQL-fragment form into queue.
--
-- Purpose:
-- Anciant way to implement replication.
--
-- Trigger parameters:
-- arg1 - queue name
-- argX - any number of optional arg, in any order
--
-- Optional arguments:
-- SKIP - The actual operation should be skipped (BEFORE trigger)
-- ignore=col1[,col2] - don't look at the specified arguments
-- pkey=col1[,col2] - Set pkey fields for the table, autodetection will be skipped
-- backup - Put urlencoded contents of old row to ev_extra2
-- colname=EXPR - Override field value with SQL expression. Can reference table
-- columns. colname can be: ev_type, ev_data, ev_extra1 .. ev_extra4
-- when=EXPR - If EXPR returns false, don't insert event.
--
-- Queue event fields:
-- ev_type - I/U/D ':' pkey_column_list
-- ev_data - column values urlencoded
-- ev_extra1 - table name
-- ev_extra2 - optional urlencoded backup
--
-- Regular listen trigger example:
-- > CREATE TRIGGER triga_nimi AFTER INSERT OR UPDATE ON customer
-- > FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('qname');
--
-- Redirect trigger example:
-- > CREATE TRIGGER triga_nimi BEFORE INSERT OR UPDATE ON customer
-- > FOR EACH ROW EXECUTE PROCEDURE pgq.logutriga('qname', 'SKIP');
-- ----------------------------------------------------------------------
declare
qname text;
ev_type text;
ev_data text;
ev_extra1 text;
ev_extra2 text;
ev_extra3 text;
ev_extra4 text;
do_skip boolean := false;
do_backup boolean := false;
do_insert boolean := true;
do_deny boolean := false;
extra_ignore_list text[];
full_ignore_list text[];
ignore_list text[] := '{}';
pkey_list text[];
pkey_str text;
field_sql_sfx text;
field_sql text[] := '{}';
data_sql text;
ignore_col_changes int4 := 0;
begin
if TG_NARGS < 1 then
raise exception 'Trigger needs queue name';
end if;
qname := TG_ARGV[0];
-- standard output
ev_extra1 := TG_TABLE_SCHEMA || '.' || TG_TABLE_NAME;
-- prepare to handle magic fields
field_sql_sfx := ')::text as val from (select $1.*) r';
extra_ignore_list := array['_pgq_ev_type', '_pgq_ev_extra1', '_pgq_ev_extra2',
'_pgq_ev_extra3', '_pgq_ev_extra4']::text[];
-- parse trigger args
declare
got boolean;
argpair text[];
i integer;
begin
for i in 1 .. TG_NARGS-1 loop
if TG_ARGV[i] in ('skip', 'SKIP') then
do_skip := true;
elsif TG_ARGV[i] = 'backup' then
do_backup := true;
elsif TG_ARGV[i] = 'deny' then
do_deny := true;
else
got := false;
for argpair in select regexp_matches(TG_ARGV[i], '^([^=]+)=(.*)') loop
got := true;
if argpair[1] = 'pkey' then
pkey_str := argpair[2];
pkey_list := string_to_array(pkey_str, ',');
elsif argpair[1] = 'ignore' then
ignore_list := string_to_array(argpair[2], ',');
elsif argpair[1] ~ '^ev_(type|extra[1-4])$' then
field_sql := array_append(field_sql, 'select ' || quote_literal(argpair[1])
|| '::text as key, (' || argpair[2] || field_sql_sfx);
elsif argpair[1] = 'when' then
field_sql := array_append(field_sql, 'select ' || quote_literal(argpair[1])
|| '::text as key, (case when (' || argpair[2]
|| ')::boolean then ''proceed'' else null end' || field_sql_sfx);
else
got := false;
end if;
end loop;
if not got then
raise exception 'bad argument: %', TG_ARGV[i];
end if;
end if;
end loop;
end;
full_ignore_list := ignore_list || extra_ignore_list;
if pkey_str is null then
select array_agg(pk.attname)
from (select k.attname from pg_index i, pg_attribute k
where i.indrelid = TG_RELID
and k.attrelid = i.indexrelid and i.indisprimary
and k.attnum > 0 and not k.attisdropped
order by k.attnum) pk
into pkey_list;
if pkey_list is null then
pkey_list := '{}';
pkey_str := '';
else
pkey_str := array_to_string(pkey_list, ',');
end if;
end if;
if pkey_str = '' and TG_OP in ('UPDATE', 'DELETE') then
raise exception 'Update/Delete on table without pkey';
end if;
if TG_OP = 'INSERT' then
ev_type := 'I';
elsif TG_OP = 'UPDATE' then
ev_type := 'U';
elsif TG_OP = 'DELETE' then
ev_type := 'D';
elsif TG_OP = 'TRUNCATE' then
ev_type := 'R';
else
raise exception 'TG_OP not supported: %', TG_OP;
end if;
if current_setting('session_replication_role') = 'local' then
if TG_WHEN = 'AFTER' or TG_OP = 'TRUNCATE' then
return null;
elsif TG_OP = 'DELETE' then
return OLD;
else
return NEW;
end if;
elsif do_deny then
raise exception 'Table ''%.%'' to queue ''%'': change not allowed (%)',
TG_TABLE_SCHEMA, TG_TABLE_NAME, qname, TG_OP;
elsif TG_OP = 'TRUNCATE' then
perform pgq.insert_event(qname, ev_type, '', ev_extra1, ev_extra2, ev_extra3, ev_extra4);
return null;
end if;
-- process table columns
declare
attr record;
pkey_sql_buf text[];
qcol text;
data_sql_buf text[];
ignore_sql text;
ignore_sql_buf text[];
pkey_change_sql text;
pkey_col_changes int4 := 0;
valexp text;
sql1_buf text[] := '{}'; -- I:cols, U:vals, D:-
sql2_buf text[] := '{}'; -- I:vals, U:pks, D:pks
sql1_buf_fallback text[] := '{}';
val_sql text;
has_changed boolean;
begin
for attr in
select k.attnum, k.attname, k.atttypid
from pg_attribute k
where k.attrelid = TG_RELID and k.attnum > 0 and not k.attisdropped
order by k.attnum
loop
qcol := quote_ident(attr.attname);
if attr.attname = any (ignore_list) then
ignore_sql_buf := array_append(ignore_sql_buf,
'select case when rold.' || qcol || ' is null and rnew.' || qcol || ' is null then false'
|| ' when rold.' || qcol || ' is null or rnew.' || qcol || ' is null then true'
|| ' else rold.' || qcol || ' <> rnew.' || qcol
|| ' end as is_changed '
|| 'from (select $1.*) rold, (select $2.*) rnew');
continue;
elsif attr.attname = any (extra_ignore_list) then
field_sql := array_prepend('select ' || quote_literal(substring(attr.attname from 6))
|| '::text as key, (r.' || qcol || field_sql_sfx, field_sql);
continue;
end if;
if attr.atttypid = 'boolean'::regtype::oid then
valexp := 'case r.' || qcol || ' when true then ''t'' when false then ''f'' else null end';
else
valexp := 'r.' || qcol || '::text';
end if;
if attr.attname = any (pkey_list) then
pkey_sql_buf := array_append(pkey_sql_buf,
'select case when rold.' || qcol || ' is null and rnew.' || qcol || ' is null then false'
|| ' when rold.' || qcol || ' is null or rnew.' || qcol || ' is null then true'
|| ' else rold.' || qcol || ' <> rnew.' || qcol
|| ' end as is_changed '
|| 'from (select $1.*) rold, (select $2.*) rnew');
if TG_OP in ('UPDATE', 'DELETE') then
sql2_buf := array_append(sql2_buf, 'select ' || quote_literal(qcol)
|| ' || coalesce(''='' || quote_literal(' || valexp|| '), '' is null'') as val'
|| ' from (select $1.*) r');
if array_length(sql1_buf_fallback, 1) is null then
sql1_buf_fallback := array_append(sql1_buf_fallback, 'select ' || quote_literal(qcol || '=')
|| ' || quote_nullable(' || valexp || ') as val'
|| ' from (select $1.*) r');
end if;
continue;
end if;
end if;
if TG_OP = 'INSERT' then
sql1_buf := array_append(sql1_buf, qcol);
sql2_buf := array_append(sql2_buf, 'select coalesce(quote_literal(' || valexp || '), ''null'') as val'
|| ' from (select $1.*) r');
elsif TG_OP = 'UPDATE' then
execute 'select quote_nullable(rold.' || qcol || ') <> quote_nullable(rnew.' || qcol || ') as has_changed'
|| ' from (select $1.*) rold, (select $2.*) rnew'
using OLD, NEW into has_changed;
if has_changed then
sql1_buf := array_append(sql1_buf, 'select ' || quote_literal(qcol || '=')
|| ' || quote_nullable(' || valexp || ') as val'
|| ' from (select $1.*) r');
end if;
end if;
end loop;
-- SQL to see if pkey columns have changed
if TG_OP = 'UPDATE' then
pkey_change_sql := 'select count(1) from (' || array_to_string(pkey_sql_buf, ' union all ')
|| ') cols where cols.is_changed';
execute pkey_change_sql using OLD, NEW into pkey_col_changes;
if pkey_col_changes > 0 then
raise exception 'primary key update not allowed';
end if;
end if;
-- SQL to see if ignored columns have changed
if TG_OP = 'UPDATE' and array_length(ignore_list, 1) is not null then
ignore_sql := 'select count(1) from (' || array_to_string(ignore_sql_buf, ' union all ')
|| ') cols where cols.is_changed';
execute ignore_sql using OLD, NEW into ignore_col_changes;
end if;
-- SQL to load data
if TG_OP = 'INSERT' then
data_sql := 'select array_to_string(array[''('', '
|| quote_literal(array_to_string(sql1_buf, ','))
|| ', '') values ('','
|| '(select array_to_string(array_agg(s.val), '','') from (' || array_to_string(sql2_buf, ' union all ') || ') s)'
|| ', '')'''
|| '], '''')';
elsif TG_OP = 'UPDATE' then
if array_length(sql1_buf, 1) is null then
sql1_buf := sql1_buf_fallback;
end if;
data_sql := 'select array_to_string(array['
|| '(select array_to_string(array_agg(s.val), '','') from (' || array_to_string(sql1_buf, ' union all ') || ') s)'
|| ', '' where '','
|| '(select array_to_string(array_agg(s.val), '' and '') from (' || array_to_string(sql2_buf, ' union all ') || ') s)'
|| '], '''')';
else
data_sql := 'select array_to_string(array['
|| '(select array_to_string(array_agg(s.val), '' and '') from (' || array_to_string(sql2_buf, ' union all ') || ') s)'
|| '], '''')';
end if;
end;
-- render data
declare
old_data text;
begin
if TG_OP = 'INSERT' then
execute data_sql using NEW into ev_data;
elsif TG_OP = 'UPDATE' then
-- render NEW
execute data_sql using NEW into ev_data;
-- render OLD when needed
if do_backup or array_length(ignore_list, 1) is not null then
execute data_sql using OLD into old_data;
end if;
-- only change was to ignored columns?
if old_data = ev_data and ignore_col_changes > 0 then
do_insert := false;
end if;
-- is backup needed?
if do_backup then
ev_extra2 := old_data;
end if;
elsif TG_OP = 'DELETE' then
execute data_sql using OLD into ev_data;
end if;
end;
-- apply magic args and columns
declare
col text;
val text;
rmain record;
sql text;
begin
if do_insert and array_length(field_sql, 1) is not null then
if TG_OP = 'DELETE' then
rmain := OLD;
else
rmain := NEW;
end if;
sql := array_to_string(field_sql, ' union all ');
for col, val in
execute sql using rmain
loop
if col = 'ev_type' then
ev_type := val;
elsif col = 'ev_extra1' then
ev_extra1 := val;
elsif col = 'ev_extra2' then
ev_extra2 := val;
elsif col = 'ev_extra3' then
ev_extra3 := val;
elsif col = 'ev_extra4' then
ev_extra4 := val;
elsif col = 'when' then
if val is null then
do_insert := false;
end if;
end if;
end loop;
end if;
end;
-- insert final values
if do_insert then
perform pgq.insert_event(qname, ev_type, ev_data, ev_extra1, ev_extra2, ev_extra3, ev_extra4);
end if;
if do_skip or TG_WHEN = 'AFTER' or TG_OP = 'TRUNCATE' then
return null;
elsif TG_OP = 'DELETE' then
return OLD;
else
return NEW;
end if;
end;
$$ language plpgsql;
|