1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
|
CREATE OR REPLACE FUNCTION fact_loader.try_load(p_fact_table_id INT)
RETURNS BOOLEAN AS
$BODY$
/***
This will be used by the worker, but can also be used safely if a DBA
wants to run a job manually.
*/
DECLARE
c_lock_cutoff_refresh INT = 99995;
v_err JSONB;
v_errmsg TEXT;
v_errdetail TEXT;
v_errhint TEXT;
v_errcontext TEXT;
BEGIN
-- We except rare serialization failures here which we will ignore and move to the next record
-- Anything else should be raised
BEGIN
IF EXISTS (SELECT fact_table_id
FROM fact_loader.fact_tables
WHERE fact_table_id = p_fact_table_id
FOR UPDATE SKIP LOCKED) THEN
/****
Attempt to refresh fact_table_dep_queue_table_deps or ignore if refresh is in progress.
*/
IF (SELECT pg_try_advisory_xact_lock(c_lock_cutoff_refresh)) THEN
PERFORM fact_loader.refresh_fact_table_dep_queue_table_deps();
END IF;
--Load fact table and handle exceptions to auto-disable job and log errors in case of error
BEGIN
--Scheduled daily job
IF (SELECT use_daily_schedule
FROM fact_loader.fact_tables
WHERE fact_table_id = p_fact_table_id) THEN
PERFORM fact_loader.daily_scheduled_load(p_fact_table_id);
--Queue-based job
ELSE
PERFORM fact_loader.load(p_fact_table_id);
/***
Run purge process. This need not run every launch of worker but it should not hurt.
It is better for it to run after the fact table load is successful so as to avoid a
rollback and more dead bloat
*/
PERFORM fact_loader.purge_queues();
END IF;
RETURN TRUE;
EXCEPTION
WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
v_errmsg = MESSAGE_TEXT,
v_errdetail = PG_EXCEPTION_DETAIL,
v_errhint = PG_EXCEPTION_HINT,
v_errcontext = PG_EXCEPTION_CONTEXT;
UPDATE fact_loader.fact_tables
SET last_refresh_succeeded = FALSE,
last_refresh_attempted_at = now(),
enabled = FALSE
WHERE fact_table_id = p_fact_table_id;
v_err = jsonb_strip_nulls(
jsonb_build_object(
'Message', v_errmsg,
'Detail', case when v_errdetail = '' then null else v_errdetail end,
'Hint', case when v_errhint = '' then null else v_errhint end,
'Context', case when v_errcontext = '' then null else v_errcontext end)
);
INSERT INTO fact_loader.fact_table_refresh_logs (fact_table_id, refresh_attempted_at, refresh_finished_at, messages)
VALUES (p_fact_table_id, now(), clock_timestamp(), v_err);
RETURN FALSE;
END;
ELSE
RETURN FALSE;
END IF;
EXCEPTION
WHEN serialization_failure THEN
RAISE LOG 'Serialization failure on transaction % attempting to lock % - skipping.', txid_current()::text, p_fact_table_id::text;
RETURN FALSE;
WHEN OTHERS THEN
RAISE;
END;
END;
$BODY$
LANGUAGE plpgsql;
|