File: load.sql

package info (click to toggle)
pg-fact-loader 2.0.1-5
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 1,884 kB
  • sloc: sql: 28,911; sh: 157; makefile: 26
file content (78 lines) | stat: -rw-r--r-- 3,015 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
CREATE OR REPLACE FUNCTION fact_loader.load(p_fact_table_id INT)
RETURNS VOID AS
$BODY$
DECLARE
    v_process_queue_sql text;
    v_execute_sql text;
    v_metadata_update_sql text;
    v_debug_rec record;
    v_debug_text text = '';
    v_pre_execute_hook_sql text = '';
BEGIN
/***
There are 3 basic steps to this load:
    1. Gather all queue table changes and insert them into a consolidated process_queue
    2. Update the metadata indicating the last records updated for both the queue tables and fact table
 */

/****
Get SQL to insert new data into the consolidated process_queue,
and SQL to update metadata for last_cutoffs.
 */
SELECT process_queue_sql, metadata_update_sql
INTO v_process_queue_sql, v_metadata_update_sql
FROM fact_loader.sql_builder(p_fact_table_id);

/****
Populate the consolidated queue
This just creates a temp table with all changes to be processed
 */
RAISE DEBUG 'Populating Queue for fact_table_id %: %', p_fact_table_id, v_process_queue_sql;
EXECUTE COALESCE(v_process_queue_sql, $$SELECT 'No queue data' AS result$$);

/****
 Pre-execute hook
 */
SELECT pre_execute_hook_sql INTO v_pre_execute_hook_sql
FROM fact_loader.fact_tables
WHERE fact_table_id = p_fact_table_id;

EXECUTE COALESCE(v_pre_execute_hook_sql, $$SELECT 'No pre-execute hook.' AS result$$);

/****
For DEBUG purposes only to view the actual process_queue.  Requires setting log_min_messages to DEBUG.
 */
IF current_setting('log_min_messages') = 'debug3' THEN
    INSERT INTO fact_loader.debug_process_queue (process_queue_id, fact_table_id, proid, key_value, row_created_at, row_updated_at, source_change_date)
    -- the row timestamps are not populated, so we set them here
    SELECT process_queue_id, fact_table_id, proid, key_value, now(), now(), source_change_date FROM process_queue;
END IF;

/****
With data now in the process_queue, the execute_queue function builds the SQL to execute.
Save this SQL in a variable and execute it.
If there is no data to execute, this is a no-op select statement.
 */
SELECT sql INTO v_execute_sql FROM fact_loader.execute_queue(p_fact_table_id);
RAISE DEBUG 'Executing Queue for fact_table_id %: %', p_fact_table_id, v_execute_sql;
EXECUTE COALESCE(v_execute_sql, $$SELECT 'No queue data to execute' AS result$$);

/****
With everything finished, we now update the metadata for the fact_table.
Even if no data was processed, we will still move forward last_refresh_attempted_at.

last_refresh_succeeded will be marked true always for now.  It could in the future
be used to indicate a failure in case of a caught error.
 */
RAISE DEBUG 'Updating metadata for fact_table_id %: %', p_fact_table_id, v_metadata_update_sql;
EXECUTE COALESCE(v_metadata_update_sql,
    format(
    $$UPDATE fact_loader.fact_tables ft
        SET last_refresh_attempted_at = now(),
          last_refresh_succeeded = TRUE
     WHERE fact_table_id = %s;
    $$, p_fact_table_id));

END;
$BODY$
LANGUAGE plpgsql;