File: 16_1_2_features.sql

package info (click to toggle)
pg-fact-loader 2.0.1-5
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 1,884 kB
  • sloc: sql: 28,911; sh: 157; makefile: 26
file content (405 lines) | stat: -rw-r--r-- 18,530 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
SET client_min_messages TO warning;
--This is for testing functionality of timezone-specific timestamps
SET TIMEZONE TO 'America/Chicago';

CREATE TABLE test_fact.orders_fact_chain (LIKE test_fact.orders_fact);

/****
First make a bad function def to test exception handling
 */
CREATE FUNCTION test_fact.orders_fact_chain_merge(p_order_id INT)
RETURNS VOID AS
$BODY$
BEGIN

  INSERT INTO test_fact.orders_fact_chain
  SELECT * FROM test_fact.orders_fact WHERE p_order_id = NULL::JSONB;

END;
$BODY$
LANGUAGE plpgsql;

INSERT INTO fact_loader.fact_tables
(fact_table_relid, priority)
VALUES ('test_fact.orders_fact_chain'::REGCLASS, 9);

/****
This example will use a local fact table as a queue table
 */
CREATE SCHEMA IF NOT EXISTS test_fact_audit_raw;
CREATE TABLE test_fact_audit_raw.orders_fact_audit (
    orders_fact_audit_id BIGSERIAL PRIMARY KEY,
    changed_at timestamp with time zone NOT NULL, --NOTE THE TIMESTAMPTZ
    operation character varying(1) NOT NULL,
    row_before_change jsonb,
    change jsonb,
    primary_key text,
    before_change jsonb
);

CREATE OR REPLACE FUNCTION "test_fact_audit_raw"."audit_test_fact_orders_fact"()
      RETURNS TRIGGER AS
      $$
      DECLARE
        value_row HSTORE = hstore(NULL);
        new_row HSTORE = hstore(NULL);
        audit_id BIGINT;
      BEGIN
        SELECT nextval('test_fact_audit_raw.orders_fact_audit_orders_fact_audit_id_seq') INTO audit_id;
        IF (TG_OP = 'UPDATE') THEN
          new_row = hstore(NEW);
          SELECT hstore(array_agg(sq.key), array_agg(sq.value)) INTO value_row FROM (SELECT (each(h.h)).key AS key, substring((each(h.h)).value FROM 1 FOR 500) AS value FROM (SELECT hstore(OLD) - hstore(NEW) AS h) h) sq;
          IF new_row ? TG_ARGV[0] THEN
            INSERT INTO "test_fact_audit_raw"."orders_fact_audit"("orders_fact_audit_id", changed_at, operation, before_change, change, primary_key)
            VALUES(audit_id, now(), substring(TG_OP,1,1), hstore_to_jsonb(value_row), hstore_to_jsonb(hstore(NEW) - hstore(OLD)), new_row -> TG_ARGV[0]);
          ELSE
            INSERT INTO "test_fact_audit_raw"."orders_fact_audit"("orders_fact_audit_id", changed_at, operation, before_change, change, primary_key)
            VALUES(audit_id, now(), substring(TG_OP,1,1), hstore_to_jsonb(value_row), hstore_to_jsonb(hstore(NEW) - hstore(OLD)), NULL);
          END IF;
        ELSIF (TG_OP = 'INSERT') THEN
          value_row = hstore(NEW);
          IF value_row ? TG_ARGV[0] THEN
            INSERT INTO "test_fact_audit_raw"."orders_fact_audit"("orders_fact_audit_id", changed_at, operation, before_change, change, primary_key)
            VALUES(audit_id, now(), substring(TG_OP,1,1), NULL, NULL, value_row -> TG_ARGV[0]);
          ELSE
            INSERT INTO "test_fact_audit_raw"."orders_fact_audit"("orders_fact_audit_id", changed_at, operation, before_change, change, primary_key)
            VALUES(audit_id, now(), substring(TG_OP,1,1), NULL, NULL, NULL);
          END IF;
        ELSIF (TG_OP = 'DELETE') THEN
          SELECT hstore(array_agg(sq.key), array_agg(sq.value)) INTO value_row FROM (SELECT (each(h)).key AS key, substring((each(h)).value FROM 1 FOR 500) AS value FROM hstore(OLD) h) sq;
          IF value_row ? TG_ARGV[0] THEN
            INSERT INTO "test_fact_audit_raw"."orders_fact_audit"("orders_fact_audit_id", changed_at, operation, before_change, change, primary_key)
            VALUES(audit_id, now(), substring(TG_OP,1,1), hstore_to_jsonb(value_row), NULL, value_row -> TG_ARGV[0]);
          ELSE
            INSERT INTO "test_fact_audit_raw"."orders_fact_audit"("orders_fact_audit_id", changed_at, operation, before_change, change, primary_key)
            VALUES(audit_id, now(), substring(TG_OP,1,1), hstore_to_jsonb(value_row), NULL, NULL);
          END IF;
        ELSIF (TG_OP = 'TRUNCATE') THEN
          INSERT INTO "test_fact_audit_raw"."orders_fact_audit"("orders_fact_audit_id", changed_at, operation, before_change, change, primary_key)
          VALUES(audit_id, now(), substring(TG_OP,1,1), NULL, NULL, NULL);
        ELSE
          RETURN NULL;
        END IF;
        RETURN NULL;
      END;
      $$
      LANGUAGE plpgsql;

CREATE TRIGGER row_audit_star AFTER INSERT OR DELETE OR UPDATE ON test_fact.orders_fact FOR EACH ROW EXECUTE PROCEDURE "test_fact_audit_raw"."audit_test_fact_orders_fact" ('order_id');

--Note that we DO NOT insert a pglogical_node_if_id - because this queue table is local
INSERT INTO fact_loader.queue_tables (queue_table_relid, queue_of_base_table_relid, queue_table_tz)
SELECT st.relid::REGCLASS, sts.relid::REGCLASS, NULL
FROM (SELECT c.oid AS relid, c.relname, n.nspname AS schemaname FROM pg_class c INNER JOIN pg_namespace n ON n.oid = c.relnamespace) st
INNER JOIN (SELECT c.oid AS relid, c.relname, n.nspname AS schemaname FROM pg_class c INNER JOIN pg_namespace n ON n.oid = c.relnamespace) sts ON sts.schemaname||'_audit_raw' = st.schemaname AND sts.relname||'_audit' = st.relname
WHERE st.schemaname = 'test_fact_audit_raw';

SELECT fact_loader.add_batch_id_fields();

WITH queue_tables_with_proids AS (
  SELECT
    *,
    'test_fact.orders_fact_chain_merge'::REGPROC AS insert_merge_proid,
    'test_fact.orders_fact_chain_merge'::REGPROC AS update_merge_proid,
    'test_fact.orders_fact_chain_merge'::REGPROC AS delete_merge_proid
  FROM fact_loader.queue_tables
  WHERE queue_of_base_table_relid IN
    /***
    These are the tables that are involved in test_fact.customers_fact_aggregator
    Find this out for each function in order to properly configure all possible changes
    that could affect the tables
     */
        ('test_fact.orders_fact'::REGCLASS)
  )

  INSERT INTO fact_loader.queue_table_deps
  (fact_table_id, queue_table_id, insert_merge_proid, update_merge_proid, delete_merge_proid)
  SELECT
    fact_table_id, queue_tables_with_proids.queue_table_id, insert_merge_proid, update_merge_proid, delete_merge_proid
  FROM fact_loader.fact_tables
  CROSS JOIN queue_tables_with_proids
  WHERE fact_table_relid = 'test_fact.orders_fact_chain'::REGCLASS;

  --Key retrieval for updates
  INSERT INTO fact_loader.key_retrieval_sequences (
    queue_table_dep_id,
    filter_scope,
    level,
    return_columns,
    is_fact_key)
  SELECT
    queue_table_dep_id,
    NULL,
    1,
    '{order_id}'::name[],
    true
  FROM fact_loader.queue_table_deps qtd
  INNER JOIN fact_loader.queue_tables qt USING (queue_table_id)
  INNER JOIN fact_loader.fact_tables ft USING (fact_table_id)
  WHERE fact_table_relid = 'test_fact.orders_fact_chain'::REGCLASS
    AND queue_of_base_table_relid IN('test_fact.orders_fact'::REGCLASS);

--Force orders_fact update
UPDATE test.orders SET total = 2010.00 WHERE order_id = 3;
UPDATE fact_loader.fact_tables SET enabled = (fact_table_relid = 'test_fact.orders_fact'::REGCLASS);
SELECT test.tick();
SELECT fact_loader.worker();
SELECT order_id, customer_id, order_date, total, is_reorder
FROM test_fact.orders_fact
ORDER BY order_id;

--Don't tick - because this table is LOCAL and should update regardless of ticker.
UPDATE fact_loader.fact_tables SET enabled = FALSE;
UPDATE fact_loader.fact_tables SET force_worker_priority = TRUE, enabled = TRUE WHERE fact_table_relid = 'test_fact.orders_fact_chain'::REGCLASS;
--This should also return false in case of error
SELECT fact_loader.worker();

--We should see an error now
SELECT fact_table_id,
  fact_table_relid,
  CASE WHEN current_setting('server_version_num')::INT >= 110000 THEN REPLACE(messages::TEXT, 'types', 'type(s)')::JSONB ELSE messages END
FROM fact_loader.unresolved_failures;

--No data
SELECT order_id, customer_id, order_date, total, is_reorder
FROM test_fact.orders_fact_chain
ORDER BY order_id;

--Let's fix the function def and re-run
CREATE OR REPLACE FUNCTION test_fact.orders_fact_chain_merge(p_order_id INT)
RETURNS VOID AS
$BODY$
BEGIN

  INSERT INTO test_fact.orders_fact_chain
  SELECT * FROM test_fact.orders_fact WHERE order_id = p_order_id;

END;
$BODY$
LANGUAGE plpgsql;

--Now re-enable and re-run
UPDATE fact_loader.fact_tables SET enabled = TRUE WHERE fact_table_relid = 'test_fact.orders_fact_chain'::REGCLASS;
SELECT fact_loader.worker();

--We should see nothing here now
SELECT fact_table_id,
  fact_table_relid,
  messages
FROM fact_loader.unresolved_failures;

--1 row
SELECT order_id, customer_id, order_date, total, is_reorder
FROM test_fact.orders_fact_chain
ORDER BY order_id;

--This is NOT a new feature but a new test coverage - testing concurrency.
\! psql contrib_regression -c 'BEGIN; SELECT fact_loader.worker() INTO try1; SELECT pg_sleep(2); COMMIT;' &
SELECT pg_sleep(1);
\! psql contrib_regression -c ' SELECT fact_loader.worker() INTO try2;'
SELECT pg_sleep(4);
SELECT * FROM try1;
SELECT * FROM try2;

--Daily schedule test - with range format suggestions included!!!
--This kind of table should have a gist exclusion index for the daterange but we won't do it in the test
CREATE TABLE test_fact.daily_customers_fact (LIKE test_fact.customers_fact);
ALTER TABLE test_fact.daily_customers_fact ADD COLUMN as_of_date daterange;
ALTER TABLE test_fact.daily_customers_fact ADD PRIMARY KEY (customer_id, as_of_date);

CREATE FUNCTION test_fact.daily_customers_fact_merge()
RETURNS VOID AS
$BODY$
BEGIN

DROP TABLE IF EXISTS changes;
CREATE TEMP TABLE changes AS
SELECT customer_id, phone, age, last_order_id, order_product_count, order_product_promo_ids
FROM test_fact.customers_fact
EXCEPT
SELECT customer_id, phone, age, last_order_id, order_product_count, order_product_promo_ids
FROM test_fact.daily_customers_fact
WHERE upper(as_of_date) = 'infinity';

UPDATE test_fact.daily_customers_fact
SET as_of_date = daterange(lower(as_of_date), current_date)
WHERE customer_id IN (SELECT customer_id FROM changes)
  AND upper(as_of_date) = 'infinity';

INSERT INTO test_fact.daily_customers_fact (as_of_date, customer_id, phone, age, last_order_id, order_product_count, order_product_promo_ids)
SELECT daterange(current_date,'infinity') AS as_of_date, customer_id, phone, age, last_order_id, order_product_count, order_product_promo_ids
FROM changes
ON CONFLICT (customer_id, as_of_date)
DO UPDATE
  SET phone = EXCLUDED.phone,
    age = EXCLUDED.age,
    last_order_id = EXCLUDED.last_order_id,
    order_product_count = EXCLUDED.order_product_count,
    order_product_promo_ids = EXCLUDED.order_product_promo_ids;

END;
$BODY$
LANGUAGE plpgsql;

UPDATE fact_loader.fact_tables SET enabled = FALSE;

BEGIN;  --Keep the same transaction time to make these tests possible

INSERT INTO fact_loader.fact_tables (fact_table_relid, enabled, priority, use_daily_schedule, daily_scheduled_time, daily_scheduled_tz, daily_scheduled_proid)
VALUES ('test_fact.daily_customers_fact', TRUE, 10, TRUE, now() + interval '1 second', 'America/Chicago', 'test_fact.daily_customers_fact_merge'::REGPROC);

UPDATE fact_loader.fact_tables SET enabled = TRUE WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS;

/*****
Dependent scheduled job - as of 1.3
****/
CREATE TABLE silly (id int);
CREATE FUNCTION itran() RETURNS VOID AS $BODY$
BEGIN
INSERT INTO silly VALUES (1);
END;
$BODY$
LANGUAGE plpgsql;

CREATE TABLE willy (id int);
CREATE FUNCTION itrantoo() RETURNS VOID AS $BODY$
BEGIN
INSERT INTO willy VALUES (1);
END;
$BODY$
LANGUAGE plpgsql;

CREATE TABLE nilly (id int);
CREATE FUNCTION itrantootoo() RETURNS VOID AS $BODY$
BEGIN
INSERT INTO nilly VALUES (1);
END;
$BODY$
LANGUAGE plpgsql;

INSERT INTO fact_loader.fact_tables (fact_table_relid, enabled, priority, use_daily_schedule, daily_scheduled_time, daily_scheduled_tz, daily_scheduled_proid, depends_on_base_daily_job_id, depends_on_parent_daily_job_id)
VALUES ('silly', TRUE, 11, TRUE, NULL, NULL, 'itran'::REGPROC, (SELECT fact_table_id FROM fact_loader.fact_tables WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS), (SELECT fact_table_id FROM fact_loader.fact_tables WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS));
INSERT INTO fact_loader.fact_tables (fact_table_relid, enabled, priority, use_daily_schedule, daily_scheduled_time, daily_scheduled_tz, daily_scheduled_proid, depends_on_base_daily_job_id, depends_on_parent_daily_job_id)
VALUES ('willy', TRUE, 12, TRUE, NULL, NULL, 'itrantoo'::REGPROC, (SELECT fact_table_id FROM fact_loader.fact_tables WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS), (SELECT fact_table_id FROM fact_loader.fact_tables WHERE fact_table_relid = 'silly'::REGCLASS));
INSERT INTO fact_loader.fact_tables (fact_table_relid, enabled, priority, use_daily_schedule, daily_scheduled_time, daily_scheduled_tz, daily_scheduled_proid, depends_on_base_daily_job_id, depends_on_parent_daily_job_id)
VALUES ('nilly', TRUE, 13, TRUE, NULL, NULL, 'itrantootoo'::REGPROC, (SELECT fact_table_id FROM fact_loader.fact_tables WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS), (SELECT fact_table_id FROM fact_loader.fact_tables WHERE fact_table_relid = 'willy'::REGCLASS));

--BELOW we will try to run it only after our first one did successfully.

--Should not show the daily job because we set the daily schedule ahead in time
SELECT fact_table_id FROM fact_loader.prioritized_jobs;

UPDATE fact_loader.fact_tables SET daily_scheduled_time = now() - interval '1 second' WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS;

--Now it should show
SELECT fact_table_id FROM fact_loader.prioritized_jobs;

SELECT fact_loader.worker();

--We have to mock out the date so it appears the same any day we run this test
SELECT daterange('2018-04-15'::DATE + (lower(as_of_date) - current_date),upper(as_of_date)), customer_id, phone, age, last_order_id, order_product_count, order_product_promo_ids
FROM test_fact.daily_customers_fact ORDER BY customer_id, as_of_date;

--Pretend we ran this yesterday
UPDATE test_fact.daily_customers_fact SET as_of_date = daterange(lower(as_of_date) - 1,'infinity');

--Job should not show because it just ran - but if it has dependent job it should show
SELECT fact_table_id FROM fact_loader.prioritized_jobs;

--Pretend it ran yesterday
UPDATE fact_loader.fact_tables SET last_refresh_attempted_at = last_refresh_attempted_at - interval '1 day' WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS;

--Job should show
SELECT fact_table_id FROM fact_loader.prioritized_jobs;

--Change something silly
UPDATE test_fact.customers_fact SET phone = NULL WHERE customer_id = 10;
SELECT fact_loader.worker();

--Job should not show because it just ran - but if it has dependent job it should show
SELECT fact_table_id FROM fact_loader.prioritized_jobs;

--This should run the dependent job
SELECT fact_loader.worker();

TABLE silly;
TABLE willy;

--Now 2nd level dep should show
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
SELECT fact_loader.worker();
TABLE willy;
TABLE nilly;

--Now 3rd level dep should show
--Just check if enabling regular jobs is ok
UPDATE fact_loader.fact_tables SET enabled = true WHERE fact_table_id IN(1,2);
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
SELECT fact_loader.worker();
TABLE nilly;
UPDATE fact_loader.fact_tables SET enabled = false WHERE fact_table_id IN(1,2);

-- Need to test the next day's run when last_refresh_attempted_at is not null
UPDATE fact_loader.fact_tables SET last_refresh_attempted_at = last_refresh_attempted_at - interval '1 day' WHERE use_daily_schedule;
SELECT fact_loader.worker();
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
SELECT fact_loader.worker();
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
TABLE silly;
SELECT fact_loader.worker();
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
TABLE willy;
SELECT fact_loader.worker();
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
TABLE nilly;

--We should see one changed range
--We have to mock out the date so it appears the same any day we run this test
SELECT daterange('2018-04-15'::DATE + (lower(as_of_date) - current_date),
                 CASE
                 WHEN upper(as_of_date) = 'infinity' THEN 'infinity'
                  ELSE
                 '2018-04-15'::DATE + (upper(as_of_date) - current_date) END),
  customer_id, phone, age, last_order_id, order_product_count, order_product_promo_ids
FROM test_fact.daily_customers_fact ORDER BY customer_id, as_of_date;

--Verify it still shows if we simulate a job failure
UPDATE fact_loader.fact_tables SET last_refresh_attempted_at = now(), last_refresh_succeeded = FALSE WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS;
SELECT fact_table_id FROM fact_loader.prioritized_jobs;

--Here it should not show - if we mark that it did succeed
UPDATE fact_loader.fact_tables SET last_refresh_succeeded = TRUE WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS;
SELECT fact_table_id FROM fact_loader.prioritized_jobs;

/*** TEST ADDING DEPS TO SCHEDULED JOBS ***/
--AGAIN Pretend it ran yesterday
UPDATE fact_loader.fact_tables SET last_refresh_attempted_at = last_refresh_attempted_at - interval '1 day' WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS;

--Job should show
SELECT fact_table_id FROM fact_loader.prioritized_jobs;

--Change something silly
UPDATE test_fact.customers_fact SET phone = NULL WHERE customer_id = 10;

--Now add deps that are not met
UPDATE fact_loader.fact_tables SET daily_scheduled_deps = ARRAY['test.customers'::REGCLASS,'test.orders'::REGCLASS, 'test_fact.customers_fact'::REGCLASS], daily_scheduled_dep_delay_tolerance = '1 millisecond' WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS;

--Should fail because no way they have been updated 1 millisecond ago
SELECT fact_loader.worker();

--We fail jobs that don't meet deps because as configured, it should be an exceptional occurrence and we want to raise an alarm.  Should show an error message containing "Delayed" lingo
SELECT fact_table_id FROM fact_loader.unresolved_failures WHERE messages ->> 'Message' LIKE '%Delayed%';

--Now make the tolerance such that we know the deps are met
UPDATE fact_loader.fact_tables SET enabled = TRUE, daily_scheduled_deps = ARRAY['test.customers'::REGCLASS,'test.orders'::REGCLASS, 'test_fact.customers_fact'::REGCLASS], daily_scheduled_dep_delay_tolerance = '1 minute' WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS;

--Shows up again
SELECT fact_table_id FROM fact_loader.prioritized_jobs;

--Succeeds
SELECT fact_loader.worker();

--Does not show now
SELECT fact_table_id FROM fact_loader.prioritized_jobs;

ROLLBACK;