File: jsontriga.sql

package info (click to toggle)
pgq 3.5.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 808 kB
  • sloc: sql: 3,442; ansic: 2,013; python: 309; makefile: 84; sh: 1
file content (318 lines) | stat: -rw-r--r-- 12,206 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
create or replace function pgq.jsontriga() returns trigger as $$
-- ----------------------------------------------------------------------
-- Function: pgq.jsontriga()
--
--      Trigger function that puts row data in JSON-encoded form into queue.
--
-- Purpose:
--      Convert row data into easily parseable form.
--
-- Trigger parameters:
--      arg1 - queue name
--      argX - any number of optional arg, in any order
--
-- Optional arguments:
--      SKIP                - The actual operation should be skipped (BEFORE trigger)
--      ignore=col1[,col2]  - don't look at the specified arguments
--      pkey=col1[,col2]    - Set pkey fields for the table, autodetection will be skipped
--      backup              - Put urlencoded contents of old row to ev_extra2
--      colname=EXPR        - Override field value with SQL expression.  Can reference table
--                            columns.  colname can be: ev_type, ev_data, ev_extra1 .. ev_extra4
--      when=EXPR           - If EXPR returns false, don't insert event.
--
-- Queue event fields:
--      ev_type      - I/U/D ':' pkey_column_list
--      ev_data      - column values urlencoded
--      ev_extra1    - table name
--      ev_extra2    - optional urlencoded backup
--
-- Regular listen trigger example:
-- >   CREATE TRIGGER triga_nimi AFTER INSERT OR UPDATE ON customer
-- >   FOR EACH ROW EXECUTE PROCEDURE pgq.jsontriga('qname');
--
-- Redirect trigger example:
-- >   CREATE TRIGGER triga_nimi BEFORE INSERT OR UPDATE ON customer
-- >   FOR EACH ROW EXECUTE PROCEDURE pgq.jsontriga('qname', 'SKIP');
-- ----------------------------------------------------------------------
declare
    qname text;
    ev_type text;
    ev_data text;
    ev_extra1 text;
    ev_extra2 text;
    ev_extra3 text;
    ev_extra4 text;
    do_skip boolean := false;
    do_backup boolean := false;
    do_insert boolean := true;
    do_deny boolean := false;
    extra_ignore_list text[];
    full_ignore_list text[];
    ignore_list text[] := '{}';
    pkey_list text[];
    pkey_str text;
    field_sql_sfx text;
    field_sql text[] := '{}';
    data_sql text;
    ignore_col_changes int4 := 0;
begin
    if TG_NARGS < 1 then
        raise exception 'Trigger needs queue name';
    end if;
    qname := TG_ARGV[0];

    -- standard output
    ev_extra1 := TG_TABLE_SCHEMA || '.' || TG_TABLE_NAME;

    -- prepare to handle magic fields
    field_sql_sfx := ')::text as val from (select $1.*) r';
    extra_ignore_list := array['_pgq_ev_type', '_pgq_ev_extra1', '_pgq_ev_extra2',
                               '_pgq_ev_extra3', '_pgq_ev_extra4']::text[];

    -- parse trigger args
    declare
        got boolean;
        argpair text[];
        i integer;
    begin
        for i in 1 .. TG_NARGS-1 loop
            if TG_ARGV[i] in ('skip', 'SKIP') then
                do_skip := true;
            elsif TG_ARGV[i] = 'backup' then
                do_backup := true;
            elsif TG_ARGV[i] = 'deny' then
                do_deny := true;
            else
                got := false;
                for argpair in select regexp_matches(TG_ARGV[i], '^([^=]+)=(.*)') loop
                    got := true;
                    if argpair[1] = 'pkey' then
                        pkey_str := argpair[2];
                        pkey_list := string_to_array(pkey_str, ',');
                    elsif argpair[1] = 'ignore' then
                        ignore_list := string_to_array(argpair[2], ',');
                    elsif argpair[1] ~ '^ev_(type|extra[1-4])$' then
                        field_sql := array_append(field_sql, 'select ' || quote_literal(argpair[1])
                                                  || '::text as key, (' || argpair[2] || field_sql_sfx);
                    elsif argpair[1] = 'when' then
                        field_sql := array_append(field_sql, 'select ' || quote_literal(argpair[1])
                                                  || '::text as key, (case when (' || argpair[2]
                                                  || ')::boolean then ''proceed'' else null end' || field_sql_sfx);
                    else
                        got := false;
                    end if;
                end loop;
                if not got then
                    raise exception 'bad argument: %', TG_ARGV[i];
                end if;
            end if;
        end loop;
    end;

    full_ignore_list := ignore_list || extra_ignore_list;

    if pkey_str is null then
        select array_agg(pk.attname)
            from (select k.attname from pg_index i, pg_attribute k
                    where i.indrelid = TG_RELID
                        and k.attrelid = i.indexrelid and i.indisprimary
                        and k.attnum > 0 and not k.attisdropped
                    order by k.attnum) pk
            into pkey_list;
        if pkey_list is null then
            pkey_list := '{}';
            pkey_str := '';
        else
            pkey_str := array_to_string(pkey_list, ',');
        end if;
    end if;
    if pkey_str = '' and TG_OP in ('UPDATE', 'DELETE') then
        raise exception 'Update/Delete on table without pkey';
    end if;

    if TG_OP not in ('INSERT', 'UPDATE', 'DELETE', 'TRUNCATE') then
        raise exception 'TG_OP not supported: %', TG_OP;
    end if;

    -- fill ev_type
    select to_json(t.*)::text
        from (select TG_OP as op, array[TG_TABLE_SCHEMA,TG_TABLE_NAME] as "table", pkey_list as "pkey") t
        into ev_type;

    -- early exit?
    if current_setting('session_replication_role') = 'local' then
        if TG_WHEN = 'AFTER' or TG_OP = 'TRUNCATE' then
            return null;
        elsif TG_OP = 'DELETE' then
            return OLD;
        else
            return NEW;
        end if;
    elsif do_deny then
        raise exception 'Table ''%.%'' to queue ''%'': change not allowed (%)',
                    TG_TABLE_SCHEMA, TG_TABLE_NAME, qname, TG_OP;
    elsif TG_OP = 'TRUNCATE' then
        perform pgq.insert_event(qname, ev_type, '{}', ev_extra1, ev_extra2, ev_extra3, ev_extra4);
        return null;
    end if;

    -- process table columns
    declare
        attr record;
        pkey_sql_buf text[];
        qcol text;
        data_sql_buf text[];
        ignore_sql text;
        ignore_sql_buf text[];
        pkey_change_sql text;
        pkey_col_changes int4 := 0;
        valexp text;
    begin
        for attr in
            select k.attnum, k.attname, k.atttypid
                from pg_attribute k
                where k.attrelid = TG_RELID and k.attnum > 0 and not k.attisdropped
                order by k.attnum
        loop
            qcol := quote_ident(attr.attname);
            if attr.attname = any (ignore_list) then
                ignore_sql_buf := array_append(ignore_sql_buf,
                    'select case when rold.' || qcol || ' is null and rnew.' || qcol || ' is null then false'
                        || ' when rold.' || qcol || ' is null or rnew.' || qcol || ' is null then true'
                        || ' else rold.' || qcol || ' <> rnew.' || qcol
                        || ' end as is_changed '
                        || 'from (select $1.*) rold, (select $2.*) rnew');
                continue;
            elsif attr.attname = any (extra_ignore_list) then
                field_sql := array_prepend('select ' || quote_literal(substring(attr.attname from 6))
                                           || '::text as key, (r.' || qcol || field_sql_sfx, field_sql);
                continue;
            end if;

            -- force cast to text or not
            if attr.atttypid in ('timestamptz'::regtype::oid, 'timestamp'::regtype::oid,
                    'int8'::regtype::oid, 'int4'::regtype::oid, 'int2'::regtype::oid,
                    'date'::regtype::oid, 'boolean'::regtype::oid) then
                valexp := 'to_json(r.' || qcol || ')::text';
            else
                valexp := 'to_json(r.' || qcol || '::text)::text';
            end if;

            if attr.attname = any (pkey_list) then
                pkey_sql_buf := array_append(pkey_sql_buf,
                        'select case when rold.' || qcol || ' is null and rnew.' || qcol || ' is null then false'
                        || ' when rold.' || qcol || ' is null or rnew.' || qcol || ' is null then true'
                        || ' else rold.' || qcol || ' <> rnew.' || qcol
                        || ' end as is_changed '
                        || 'from (select $1.*) rold, (select $2.*) rnew');
            end if;

            data_sql_buf := array_append(data_sql_buf,
                    'select ' || quote_literal(to_json(attr.attname) || ':')
                    || ' || coalesce(' || valexp || ', ''null'') as jpair from (select $1.*) r');
        end loop;

        -- SQL to see if pkey columns have changed
        if TG_OP = 'UPDATE' then
            pkey_change_sql := 'select count(1) from (' || array_to_string(pkey_sql_buf, ' union all ')
                            || ') cols where cols.is_changed';
            execute pkey_change_sql using OLD, NEW into pkey_col_changes;
            if pkey_col_changes > 0 then
                raise exception 'primary key update not allowed';
            end if;
        end if;

        -- SQL to see if ignored columns have changed
        if TG_OP = 'UPDATE' and array_length(ignore_list, 1) is not null then
            ignore_sql := 'select count(1) from (' || array_to_string(ignore_sql_buf, ' union all ')
                || ') cols where cols.is_changed';
            execute ignore_sql using OLD, NEW into ignore_col_changes;
        end if;

        -- SQL to load data
        data_sql := 'select ''{'' || array_to_string(array_agg(cols.jpair), '','') || ''}'' from ('
                 || array_to_string(data_sql_buf, ' union all ') || ') cols';
    end;

    -- render data
    declare
        old_data text;
    begin
        if TG_OP = 'INSERT' then
            execute data_sql using NEW into ev_data;
        elsif TG_OP = 'UPDATE' then

            -- render NEW
            execute data_sql using NEW into ev_data;

            -- render OLD when needed
            if do_backup or array_length(ignore_list, 1) is not null then
                execute data_sql using OLD into old_data;
            end if;

            -- only change was to ignored columns?
            if old_data = ev_data and ignore_col_changes > 0 then
                do_insert := false;
            end if;

            -- is backup needed?
            if do_backup then
                ev_extra2 := old_data;
            end if;
        elsif TG_OP = 'DELETE' then
            execute data_sql using OLD into ev_data;
        end if;
    end;

    -- apply magic args and columns
    declare
        col text;
        val text;
        rmain record;
        sql text;
    begin
        if do_insert and array_length(field_sql, 1) is not null then
            if TG_OP = 'DELETE' then
                rmain := OLD;
            else
                rmain := NEW;
            end if;

            sql := array_to_string(field_sql, ' union all ');
            for col, val in
                execute sql using rmain
            loop
                if col = 'ev_type' then
                    ev_type := val;
                elsif col = 'ev_extra1' then
                    ev_extra1 := val;
                elsif col = 'ev_extra2' then
                    ev_extra2 := val;
                elsif col = 'ev_extra3' then
                    ev_extra3 := val;
                elsif col = 'ev_extra4' then
                    ev_extra4 := val;
                elsif col = 'when' then
                    if val is null then
                        do_insert := false;
                    end if;
                end if;
            end loop;
        end if;
    end;

    -- insert final values
    if do_insert then
        perform pgq.insert_event(qname, ev_type, ev_data, ev_extra1, ev_extra2, ev_extra3, ev_extra4);
    end if;

    if do_skip or TG_WHEN = 'AFTER' or TG_OP = 'TRUNCATE' then
        return null;
    elsif TG_OP = 'DELETE' then
        return OLD;
    else
        return NEW;
    end if;
end;
$$ language plpgsql;