1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
|
SET client_min_messages TO warning;
--This is for testing functionality of timezone-specific timestamps
SET TIMEZONE TO 'America/Chicago';
CREATE TABLE test_fact.orders_fact_chain (LIKE test_fact.orders_fact);
/****
First make a bad function def to test exception handling
*/
CREATE FUNCTION test_fact.orders_fact_chain_merge(p_order_id INT)
RETURNS VOID AS
$BODY$
BEGIN
INSERT INTO test_fact.orders_fact_chain
SELECT * FROM test_fact.orders_fact WHERE p_order_id = NULL::JSONB;
END;
$BODY$
LANGUAGE plpgsql;
INSERT INTO fact_loader.fact_tables
(fact_table_relid, priority)
VALUES ('test_fact.orders_fact_chain'::REGCLASS, 9);
/****
This example will use a local fact table as a queue table
*/
CREATE SCHEMA IF NOT EXISTS test_fact_audit_raw;
CREATE TABLE test_fact_audit_raw.orders_fact_audit (
orders_fact_audit_id BIGSERIAL PRIMARY KEY,
changed_at timestamp with time zone NOT NULL, --NOTE THE TIMESTAMPTZ
operation character varying(1) NOT NULL,
row_before_change jsonb,
change jsonb,
primary_key text,
before_change jsonb
);
CREATE OR REPLACE FUNCTION "test_fact_audit_raw"."audit_test_fact_orders_fact"()
RETURNS TRIGGER AS
$$
DECLARE
value_row HSTORE = hstore(NULL);
new_row HSTORE = hstore(NULL);
audit_id BIGINT;
BEGIN
SELECT nextval('test_fact_audit_raw.orders_fact_audit_orders_fact_audit_id_seq') INTO audit_id;
IF (TG_OP = 'UPDATE') THEN
new_row = hstore(NEW);
SELECT hstore(array_agg(sq.key), array_agg(sq.value)) INTO value_row FROM (SELECT (each(h.h)).key AS key, substring((each(h.h)).value FROM 1 FOR 500) AS value FROM (SELECT hstore(OLD) - hstore(NEW) AS h) h) sq;
IF new_row ? TG_ARGV[0] THEN
INSERT INTO "test_fact_audit_raw"."orders_fact_audit"("orders_fact_audit_id", changed_at, operation, before_change, change, primary_key)
VALUES(audit_id, now(), substring(TG_OP,1,1), hstore_to_jsonb(value_row), hstore_to_jsonb(hstore(NEW) - hstore(OLD)), new_row -> TG_ARGV[0]);
ELSE
INSERT INTO "test_fact_audit_raw"."orders_fact_audit"("orders_fact_audit_id", changed_at, operation, before_change, change, primary_key)
VALUES(audit_id, now(), substring(TG_OP,1,1), hstore_to_jsonb(value_row), hstore_to_jsonb(hstore(NEW) - hstore(OLD)), NULL);
END IF;
ELSIF (TG_OP = 'INSERT') THEN
value_row = hstore(NEW);
IF value_row ? TG_ARGV[0] THEN
INSERT INTO "test_fact_audit_raw"."orders_fact_audit"("orders_fact_audit_id", changed_at, operation, before_change, change, primary_key)
VALUES(audit_id, now(), substring(TG_OP,1,1), NULL, NULL, value_row -> TG_ARGV[0]);
ELSE
INSERT INTO "test_fact_audit_raw"."orders_fact_audit"("orders_fact_audit_id", changed_at, operation, before_change, change, primary_key)
VALUES(audit_id, now(), substring(TG_OP,1,1), NULL, NULL, NULL);
END IF;
ELSIF (TG_OP = 'DELETE') THEN
SELECT hstore(array_agg(sq.key), array_agg(sq.value)) INTO value_row FROM (SELECT (each(h)).key AS key, substring((each(h)).value FROM 1 FOR 500) AS value FROM hstore(OLD) h) sq;
IF value_row ? TG_ARGV[0] THEN
INSERT INTO "test_fact_audit_raw"."orders_fact_audit"("orders_fact_audit_id", changed_at, operation, before_change, change, primary_key)
VALUES(audit_id, now(), substring(TG_OP,1,1), hstore_to_jsonb(value_row), NULL, value_row -> TG_ARGV[0]);
ELSE
INSERT INTO "test_fact_audit_raw"."orders_fact_audit"("orders_fact_audit_id", changed_at, operation, before_change, change, primary_key)
VALUES(audit_id, now(), substring(TG_OP,1,1), hstore_to_jsonb(value_row), NULL, NULL);
END IF;
ELSIF (TG_OP = 'TRUNCATE') THEN
INSERT INTO "test_fact_audit_raw"."orders_fact_audit"("orders_fact_audit_id", changed_at, operation, before_change, change, primary_key)
VALUES(audit_id, now(), substring(TG_OP,1,1), NULL, NULL, NULL);
ELSE
RETURN NULL;
END IF;
RETURN NULL;
END;
$$
LANGUAGE plpgsql;
CREATE TRIGGER row_audit_star AFTER INSERT OR DELETE OR UPDATE ON test_fact.orders_fact FOR EACH ROW EXECUTE PROCEDURE "test_fact_audit_raw"."audit_test_fact_orders_fact" ('order_id');
--Note that we DO NOT insert a pglogical_node_if_id - because this queue table is local
INSERT INTO fact_loader.queue_tables (queue_table_relid, queue_of_base_table_relid, queue_table_tz)
SELECT st.relid::REGCLASS, sts.relid::REGCLASS, NULL
FROM (SELECT c.oid AS relid, c.relname, n.nspname AS schemaname FROM pg_class c INNER JOIN pg_namespace n ON n.oid = c.relnamespace) st
INNER JOIN (SELECT c.oid AS relid, c.relname, n.nspname AS schemaname FROM pg_class c INNER JOIN pg_namespace n ON n.oid = c.relnamespace) sts ON sts.schemaname||'_audit_raw' = st.schemaname AND sts.relname||'_audit' = st.relname
WHERE st.schemaname = 'test_fact_audit_raw';
SELECT fact_loader.add_batch_id_fields();
WITH queue_tables_with_proids AS (
SELECT
*,
'test_fact.orders_fact_chain_merge'::REGPROC AS insert_merge_proid,
'test_fact.orders_fact_chain_merge'::REGPROC AS update_merge_proid,
'test_fact.orders_fact_chain_merge'::REGPROC AS delete_merge_proid
FROM fact_loader.queue_tables
WHERE queue_of_base_table_relid IN
/***
These are the tables that are involved in test_fact.customers_fact_aggregator
Find this out for each function in order to properly configure all possible changes
that could affect the tables
*/
('test_fact.orders_fact'::REGCLASS)
)
INSERT INTO fact_loader.queue_table_deps
(fact_table_id, queue_table_id, insert_merge_proid, update_merge_proid, delete_merge_proid)
SELECT
fact_table_id, queue_tables_with_proids.queue_table_id, insert_merge_proid, update_merge_proid, delete_merge_proid
FROM fact_loader.fact_tables
CROSS JOIN queue_tables_with_proids
WHERE fact_table_relid = 'test_fact.orders_fact_chain'::REGCLASS;
--Key retrieval for updates
INSERT INTO fact_loader.key_retrieval_sequences (
queue_table_dep_id,
filter_scope,
level,
return_columns,
is_fact_key)
SELECT
queue_table_dep_id,
NULL,
1,
'{order_id}'::name[],
true
FROM fact_loader.queue_table_deps qtd
INNER JOIN fact_loader.queue_tables qt USING (queue_table_id)
INNER JOIN fact_loader.fact_tables ft USING (fact_table_id)
WHERE fact_table_relid = 'test_fact.orders_fact_chain'::REGCLASS
AND queue_of_base_table_relid IN('test_fact.orders_fact'::REGCLASS);
--Force orders_fact update
UPDATE test.orders SET total = 2010.00 WHERE order_id = 3;
UPDATE fact_loader.fact_tables SET enabled = (fact_table_relid = 'test_fact.orders_fact'::REGCLASS);
SELECT test.tick();
SELECT fact_loader.worker();
SELECT order_id, customer_id, order_date, total, is_reorder
FROM test_fact.orders_fact
ORDER BY order_id;
--Don't tick - because this table is LOCAL and should update regardless of ticker.
UPDATE fact_loader.fact_tables SET enabled = FALSE;
UPDATE fact_loader.fact_tables SET force_worker_priority = TRUE, enabled = TRUE WHERE fact_table_relid = 'test_fact.orders_fact_chain'::REGCLASS;
--This should also return false in case of error
SELECT fact_loader.worker();
--We should see an error now
SELECT fact_table_id,
fact_table_relid,
CASE WHEN current_setting('server_version_num')::INT >= 110000 THEN REPLACE(messages::TEXT, 'types', 'type(s)')::JSONB ELSE messages END
FROM fact_loader.unresolved_failures;
--No data
SELECT order_id, customer_id, order_date, total, is_reorder
FROM test_fact.orders_fact_chain
ORDER BY order_id;
--Let's fix the function def and re-run
CREATE OR REPLACE FUNCTION test_fact.orders_fact_chain_merge(p_order_id INT)
RETURNS VOID AS
$BODY$
BEGIN
INSERT INTO test_fact.orders_fact_chain
SELECT * FROM test_fact.orders_fact WHERE order_id = p_order_id;
END;
$BODY$
LANGUAGE plpgsql;
--Now re-enable and re-run
UPDATE fact_loader.fact_tables SET enabled = TRUE WHERE fact_table_relid = 'test_fact.orders_fact_chain'::REGCLASS;
SELECT fact_loader.worker();
--We should see nothing here now
SELECT fact_table_id,
fact_table_relid,
messages
FROM fact_loader.unresolved_failures;
--1 row
SELECT order_id, customer_id, order_date, total, is_reorder
FROM test_fact.orders_fact_chain
ORDER BY order_id;
--This is NOT a new feature but a new test coverage - testing concurrency.
\! psql contrib_regression -c 'BEGIN; SELECT fact_loader.worker() INTO try1; SELECT pg_sleep(2); COMMIT;' &
SELECT pg_sleep(1);
\! psql contrib_regression -c ' SELECT fact_loader.worker() INTO try2;'
SELECT pg_sleep(4);
SELECT * FROM try1;
SELECT * FROM try2;
--Daily schedule test - with range format suggestions included!!!
--This kind of table should have a gist exclusion index for the daterange but we won't do it in the test
CREATE TABLE test_fact.daily_customers_fact (LIKE test_fact.customers_fact);
ALTER TABLE test_fact.daily_customers_fact ADD COLUMN as_of_date daterange;
ALTER TABLE test_fact.daily_customers_fact ADD PRIMARY KEY (customer_id, as_of_date);
CREATE FUNCTION test_fact.daily_customers_fact_merge()
RETURNS VOID AS
$BODY$
BEGIN
DROP TABLE IF EXISTS changes;
CREATE TEMP TABLE changes AS
SELECT customer_id, phone, age, last_order_id, order_product_count, order_product_promo_ids
FROM test_fact.customers_fact
EXCEPT
SELECT customer_id, phone, age, last_order_id, order_product_count, order_product_promo_ids
FROM test_fact.daily_customers_fact
WHERE upper(as_of_date) = 'infinity';
UPDATE test_fact.daily_customers_fact
SET as_of_date = daterange(lower(as_of_date), current_date)
WHERE customer_id IN (SELECT customer_id FROM changes)
AND upper(as_of_date) = 'infinity';
INSERT INTO test_fact.daily_customers_fact (as_of_date, customer_id, phone, age, last_order_id, order_product_count, order_product_promo_ids)
SELECT daterange(current_date,'infinity') AS as_of_date, customer_id, phone, age, last_order_id, order_product_count, order_product_promo_ids
FROM changes
ON CONFLICT (customer_id, as_of_date)
DO UPDATE
SET phone = EXCLUDED.phone,
age = EXCLUDED.age,
last_order_id = EXCLUDED.last_order_id,
order_product_count = EXCLUDED.order_product_count,
order_product_promo_ids = EXCLUDED.order_product_promo_ids;
END;
$BODY$
LANGUAGE plpgsql;
UPDATE fact_loader.fact_tables SET enabled = FALSE;
BEGIN; --Keep the same transaction time to make these tests possible
INSERT INTO fact_loader.fact_tables (fact_table_relid, enabled, priority, use_daily_schedule, daily_scheduled_time, daily_scheduled_tz, daily_scheduled_proid)
VALUES ('test_fact.daily_customers_fact', TRUE, 10, TRUE, now() + interval '1 second', 'America/Chicago', 'test_fact.daily_customers_fact_merge'::REGPROC);
UPDATE fact_loader.fact_tables SET enabled = TRUE WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS;
/*****
Dependent scheduled job - as of 1.3
****/
CREATE TABLE silly (id int);
CREATE FUNCTION itran() RETURNS VOID AS $BODY$
BEGIN
INSERT INTO silly VALUES (1);
END;
$BODY$
LANGUAGE plpgsql;
CREATE TABLE willy (id int);
CREATE FUNCTION itrantoo() RETURNS VOID AS $BODY$
BEGIN
INSERT INTO willy VALUES (1);
END;
$BODY$
LANGUAGE plpgsql;
CREATE TABLE nilly (id int);
CREATE FUNCTION itrantootoo() RETURNS VOID AS $BODY$
BEGIN
INSERT INTO nilly VALUES (1);
END;
$BODY$
LANGUAGE plpgsql;
INSERT INTO fact_loader.fact_tables (fact_table_relid, enabled, priority, use_daily_schedule, daily_scheduled_time, daily_scheduled_tz, daily_scheduled_proid, depends_on_base_daily_job_id, depends_on_parent_daily_job_id)
VALUES ('silly', TRUE, 11, TRUE, NULL, NULL, 'itran'::REGPROC, (SELECT fact_table_id FROM fact_loader.fact_tables WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS), (SELECT fact_table_id FROM fact_loader.fact_tables WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS));
INSERT INTO fact_loader.fact_tables (fact_table_relid, enabled, priority, use_daily_schedule, daily_scheduled_time, daily_scheduled_tz, daily_scheduled_proid, depends_on_base_daily_job_id, depends_on_parent_daily_job_id)
VALUES ('willy', TRUE, 12, TRUE, NULL, NULL, 'itrantoo'::REGPROC, (SELECT fact_table_id FROM fact_loader.fact_tables WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS), (SELECT fact_table_id FROM fact_loader.fact_tables WHERE fact_table_relid = 'silly'::REGCLASS));
INSERT INTO fact_loader.fact_tables (fact_table_relid, enabled, priority, use_daily_schedule, daily_scheduled_time, daily_scheduled_tz, daily_scheduled_proid, depends_on_base_daily_job_id, depends_on_parent_daily_job_id)
VALUES ('nilly', TRUE, 13, TRUE, NULL, NULL, 'itrantootoo'::REGPROC, (SELECT fact_table_id FROM fact_loader.fact_tables WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS), (SELECT fact_table_id FROM fact_loader.fact_tables WHERE fact_table_relid = 'willy'::REGCLASS));
--BELOW we will try to run it only after our first one did successfully.
--Should not show the daily job because we set the daily schedule ahead in time
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
UPDATE fact_loader.fact_tables SET daily_scheduled_time = now() - interval '1 second' WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS;
--Now it should show
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
SELECT fact_loader.worker();
--We have to mock out the date so it appears the same any day we run this test
SELECT daterange('2018-04-15'::DATE + (lower(as_of_date) - current_date),upper(as_of_date)), customer_id, phone, age, last_order_id, order_product_count, order_product_promo_ids
FROM test_fact.daily_customers_fact ORDER BY customer_id, as_of_date;
--Pretend we ran this yesterday
UPDATE test_fact.daily_customers_fact SET as_of_date = daterange(lower(as_of_date) - 1,'infinity');
--Job should not show because it just ran - but if it has dependent job it should show
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
--Pretend it ran yesterday
UPDATE fact_loader.fact_tables SET last_refresh_attempted_at = last_refresh_attempted_at - interval '1 day' WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS;
--Job should show
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
--Change something silly
UPDATE test_fact.customers_fact SET phone = NULL WHERE customer_id = 10;
SELECT fact_loader.worker();
--Job should not show because it just ran - but if it has dependent job it should show
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
--This should run the dependent job
SELECT fact_loader.worker();
TABLE silly;
TABLE willy;
--Now 2nd level dep should show
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
SELECT fact_loader.worker();
TABLE willy;
TABLE nilly;
--Now 3rd level dep should show
--Just check if enabling regular jobs is ok
UPDATE fact_loader.fact_tables SET enabled = true WHERE fact_table_id IN(1,2);
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
SELECT fact_loader.worker();
TABLE nilly;
UPDATE fact_loader.fact_tables SET enabled = false WHERE fact_table_id IN(1,2);
-- Need to test the next day's run when last_refresh_attempted_at is not null
UPDATE fact_loader.fact_tables SET last_refresh_attempted_at = last_refresh_attempted_at - interval '1 day' WHERE use_daily_schedule;
SELECT fact_loader.worker();
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
SELECT fact_loader.worker();
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
TABLE silly;
SELECT fact_loader.worker();
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
TABLE willy;
SELECT fact_loader.worker();
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
TABLE nilly;
--We should see one changed range
--We have to mock out the date so it appears the same any day we run this test
SELECT daterange('2018-04-15'::DATE + (lower(as_of_date) - current_date),
CASE
WHEN upper(as_of_date) = 'infinity' THEN 'infinity'
ELSE
'2018-04-15'::DATE + (upper(as_of_date) - current_date) END),
customer_id, phone, age, last_order_id, order_product_count, order_product_promo_ids
FROM test_fact.daily_customers_fact ORDER BY customer_id, as_of_date;
--Verify it still shows if we simulate a job failure
UPDATE fact_loader.fact_tables SET last_refresh_attempted_at = now(), last_refresh_succeeded = FALSE WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS;
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
--Here it should not show - if we mark that it did succeed
UPDATE fact_loader.fact_tables SET last_refresh_succeeded = TRUE WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS;
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
/*** TEST ADDING DEPS TO SCHEDULED JOBS ***/
--AGAIN Pretend it ran yesterday
UPDATE fact_loader.fact_tables SET last_refresh_attempted_at = last_refresh_attempted_at - interval '1 day' WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS;
--Job should show
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
--Change something silly
UPDATE test_fact.customers_fact SET phone = NULL WHERE customer_id = 10;
--Now add deps that are not met
UPDATE fact_loader.fact_tables SET daily_scheduled_deps = ARRAY['test.customers'::REGCLASS,'test.orders'::REGCLASS, 'test_fact.customers_fact'::REGCLASS], daily_scheduled_dep_delay_tolerance = '1 millisecond' WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS;
--Should fail because no way they have been updated 1 millisecond ago
SELECT fact_loader.worker();
--We fail jobs that don't meet deps because as configured, it should be an exceptional occurrence and we want to raise an alarm. Should show an error message containing "Delayed" lingo
SELECT fact_table_id FROM fact_loader.unresolved_failures WHERE messages ->> 'Message' LIKE '%Delayed%';
--Now make the tolerance such that we know the deps are met
UPDATE fact_loader.fact_tables SET enabled = TRUE, daily_scheduled_deps = ARRAY['test.customers'::REGCLASS,'test.orders'::REGCLASS, 'test_fact.customers_fact'::REGCLASS], daily_scheduled_dep_delay_tolerance = '1 minute' WHERE fact_table_relid = 'test_fact.daily_customers_fact'::REGCLASS;
--Shows up again
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
--Succeeds
SELECT fact_loader.worker();
--Does not show now
SELECT fact_table_id FROM fact_loader.prioritized_jobs;
ROLLBACK;
|