1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
|
create or replace function pgq.jsontriga() returns trigger as $$
-- ----------------------------------------------------------------------
-- Function: pgq.jsontriga()
--
-- Trigger function that puts row data in JSON-encoded form into queue.
--
-- Purpose:
-- Convert row data into easily parseable form.
--
-- Trigger parameters:
-- arg1 - queue name
-- argX - any number of optional arg, in any order
--
-- Optional arguments:
-- SKIP - The actual operation should be skipped (BEFORE trigger)
-- ignore=col1[,col2] - don't look at the specified arguments
-- pkey=col1[,col2] - Set pkey fields for the table, autodetection will be skipped
-- backup - Put urlencoded contents of old row to ev_extra2
-- colname=EXPR - Override field value with SQL expression. Can reference table
-- columns. colname can be: ev_type, ev_data, ev_extra1 .. ev_extra4
-- when=EXPR - If EXPR returns false, don't insert event.
--
-- Queue event fields:
-- ev_type - I/U/D ':' pkey_column_list
-- ev_data - column values urlencoded
-- ev_extra1 - table name
-- ev_extra2 - optional urlencoded backup
--
-- Regular listen trigger example:
-- > CREATE TRIGGER triga_nimi AFTER INSERT OR UPDATE ON customer
-- > FOR EACH ROW EXECUTE PROCEDURE pgq.jsontriga('qname');
--
-- Redirect trigger example:
-- > CREATE TRIGGER triga_nimi BEFORE INSERT OR UPDATE ON customer
-- > FOR EACH ROW EXECUTE PROCEDURE pgq.jsontriga('qname', 'SKIP');
-- ----------------------------------------------------------------------
declare
qname text;
ev_type text;
ev_data text;
ev_extra1 text;
ev_extra2 text;
ev_extra3 text;
ev_extra4 text;
do_skip boolean := false;
do_backup boolean := false;
do_insert boolean := true;
do_deny boolean := false;
extra_ignore_list text[];
full_ignore_list text[];
ignore_list text[] := '{}';
pkey_list text[];
pkey_str text;
field_sql_sfx text;
field_sql text[] := '{}';
data_sql text;
ignore_col_changes int4 := 0;
begin
if TG_NARGS < 1 then
raise exception 'Trigger needs queue name';
end if;
qname := TG_ARGV[0];
-- standard output
ev_extra1 := TG_TABLE_SCHEMA || '.' || TG_TABLE_NAME;
-- prepare to handle magic fields
field_sql_sfx := ')::text as val from (select $1.*) r';
extra_ignore_list := array['_pgq_ev_type', '_pgq_ev_extra1', '_pgq_ev_extra2',
'_pgq_ev_extra3', '_pgq_ev_extra4']::text[];
-- parse trigger args
declare
got boolean;
argpair text[];
i integer;
begin
for i in 1 .. TG_NARGS-1 loop
if TG_ARGV[i] in ('skip', 'SKIP') then
do_skip := true;
elsif TG_ARGV[i] = 'backup' then
do_backup := true;
elsif TG_ARGV[i] = 'deny' then
do_deny := true;
else
got := false;
for argpair in select regexp_matches(TG_ARGV[i], '^([^=]+)=(.*)') loop
got := true;
if argpair[1] = 'pkey' then
pkey_str := argpair[2];
pkey_list := string_to_array(pkey_str, ',');
elsif argpair[1] = 'ignore' then
ignore_list := string_to_array(argpair[2], ',');
elsif argpair[1] ~ '^ev_(type|extra[1-4])$' then
field_sql := array_append(field_sql, 'select ' || quote_literal(argpair[1])
|| '::text as key, (' || argpair[2] || field_sql_sfx);
elsif argpair[1] = 'when' then
field_sql := array_append(field_sql, 'select ' || quote_literal(argpair[1])
|| '::text as key, (case when (' || argpair[2]
|| ')::boolean then ''proceed'' else null end' || field_sql_sfx);
else
got := false;
end if;
end loop;
if not got then
raise exception 'bad argument: %', TG_ARGV[i];
end if;
end if;
end loop;
end;
full_ignore_list := ignore_list || extra_ignore_list;
if pkey_str is null then
select array_agg(pk.attname)
from (select k.attname from pg_index i, pg_attribute k
where i.indrelid = TG_RELID
and k.attrelid = i.indexrelid and i.indisprimary
and k.attnum > 0 and not k.attisdropped
order by k.attnum) pk
into pkey_list;
if pkey_list is null then
pkey_list := '{}';
pkey_str := '';
else
pkey_str := array_to_string(pkey_list, ',');
end if;
end if;
if pkey_str = '' and TG_OP in ('UPDATE', 'DELETE') then
raise exception 'Update/Delete on table without pkey';
end if;
if TG_OP not in ('INSERT', 'UPDATE', 'DELETE', 'TRUNCATE') then
raise exception 'TG_OP not supported: %', TG_OP;
end if;
-- fill ev_type
select to_json(t.*)::text
from (select TG_OP as op, array[TG_TABLE_SCHEMA,TG_TABLE_NAME] as "table", pkey_list as "pkey") t
into ev_type;
-- early exit?
if current_setting('session_replication_role') = 'local' then
if TG_WHEN = 'AFTER' or TG_OP = 'TRUNCATE' then
return null;
elsif TG_OP = 'DELETE' then
return OLD;
else
return NEW;
end if;
elsif do_deny then
raise exception 'Table ''%.%'' to queue ''%'': change not allowed (%)',
TG_TABLE_SCHEMA, TG_TABLE_NAME, qname, TG_OP;
elsif TG_OP = 'TRUNCATE' then
perform pgq.insert_event(qname, ev_type, '{}', ev_extra1, ev_extra2, ev_extra3, ev_extra4);
return null;
end if;
-- process table columns
declare
attr record;
pkey_sql_buf text[];
qcol text;
data_sql_buf text[];
ignore_sql text;
ignore_sql_buf text[];
pkey_change_sql text;
pkey_col_changes int4 := 0;
valexp text;
begin
for attr in
select k.attnum, k.attname, k.atttypid
from pg_attribute k
where k.attrelid = TG_RELID and k.attnum > 0 and not k.attisdropped
order by k.attnum
loop
qcol := quote_ident(attr.attname);
if attr.attname = any (ignore_list) then
ignore_sql_buf := array_append(ignore_sql_buf,
'select case when rold.' || qcol || ' is null and rnew.' || qcol || ' is null then false'
|| ' when rold.' || qcol || ' is null or rnew.' || qcol || ' is null then true'
|| ' else rold.' || qcol || ' <> rnew.' || qcol
|| ' end as is_changed '
|| 'from (select $1.*) rold, (select $2.*) rnew');
continue;
elsif attr.attname = any (extra_ignore_list) then
field_sql := array_prepend('select ' || quote_literal(substring(attr.attname from 6))
|| '::text as key, (r.' || qcol || field_sql_sfx, field_sql);
continue;
end if;
-- force cast to text or not
if attr.atttypid in ('timestamptz'::regtype::oid, 'timestamp'::regtype::oid,
'int8'::regtype::oid, 'int4'::regtype::oid, 'int2'::regtype::oid,
'date'::regtype::oid, 'boolean'::regtype::oid) then
valexp := 'to_json(r.' || qcol || ')::text';
else
valexp := 'to_json(r.' || qcol || '::text)::text';
end if;
if attr.attname = any (pkey_list) then
pkey_sql_buf := array_append(pkey_sql_buf,
'select case when rold.' || qcol || ' is null and rnew.' || qcol || ' is null then false'
|| ' when rold.' || qcol || ' is null or rnew.' || qcol || ' is null then true'
|| ' else rold.' || qcol || ' <> rnew.' || qcol
|| ' end as is_changed '
|| 'from (select $1.*) rold, (select $2.*) rnew');
end if;
data_sql_buf := array_append(data_sql_buf,
'select ' || quote_literal(to_json(attr.attname) || ':')
|| ' || coalesce(' || valexp || ', ''null'') as jpair from (select $1.*) r');
end loop;
-- SQL to see if pkey columns have changed
if TG_OP = 'UPDATE' then
pkey_change_sql := 'select count(1) from (' || array_to_string(pkey_sql_buf, ' union all ')
|| ') cols where cols.is_changed';
execute pkey_change_sql using OLD, NEW into pkey_col_changes;
if pkey_col_changes > 0 then
raise exception 'primary key update not allowed';
end if;
end if;
-- SQL to see if ignored columns have changed
if TG_OP = 'UPDATE' and array_length(ignore_list, 1) is not null then
ignore_sql := 'select count(1) from (' || array_to_string(ignore_sql_buf, ' union all ')
|| ') cols where cols.is_changed';
execute ignore_sql using OLD, NEW into ignore_col_changes;
end if;
-- SQL to load data
data_sql := 'select ''{'' || array_to_string(array_agg(cols.jpair), '','') || ''}'' from ('
|| array_to_string(data_sql_buf, ' union all ') || ') cols';
end;
-- render data
declare
old_data text;
begin
if TG_OP = 'INSERT' then
execute data_sql using NEW into ev_data;
elsif TG_OP = 'UPDATE' then
-- render NEW
execute data_sql using NEW into ev_data;
-- render OLD when needed
if do_backup or array_length(ignore_list, 1) is not null then
execute data_sql using OLD into old_data;
end if;
-- only change was to ignored columns?
if old_data = ev_data and ignore_col_changes > 0 then
do_insert := false;
end if;
-- is backup needed?
if do_backup then
ev_extra2 := old_data;
end if;
elsif TG_OP = 'DELETE' then
execute data_sql using OLD into ev_data;
end if;
end;
-- apply magic args and columns
declare
col text;
val text;
rmain record;
sql text;
begin
if do_insert and array_length(field_sql, 1) is not null then
if TG_OP = 'DELETE' then
rmain := OLD;
else
rmain := NEW;
end if;
sql := array_to_string(field_sql, ' union all ');
for col, val in
execute sql using rmain
loop
if col = 'ev_type' then
ev_type := val;
elsif col = 'ev_extra1' then
ev_extra1 := val;
elsif col = 'ev_extra2' then
ev_extra2 := val;
elsif col = 'ev_extra3' then
ev_extra3 := val;
elsif col = 'ev_extra4' then
ev_extra4 := val;
elsif col = 'when' then
if val is null then
do_insert := false;
end if;
end if;
end loop;
end if;
end;
-- insert final values
if do_insert then
perform pgq.insert_event(qname, ev_type, ev_data, ev_extra1, ev_extra2, ev_extra3, ev_extra4);
end if;
if do_skip or TG_WHEN = 'AFTER' or TG_OP = 'TRUNCATE' then
return null;
elsif TG_OP = 'DELETE' then
return OLD;
else
return NEW;
end if;
end;
$$ language plpgsql;
|