File: 15_source_change_date.sql

package info (click to toggle)
pg-fact-loader 2.0.1-5
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 1,884 kB
  • sloc: sql: 28,911; sh: 157; makefile: 26
file content (182 lines) | stat: -rw-r--r-- 7,021 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
SET client_min_messages TO warning;
--This is for testing functionality of timezone-specific timestamps
SET TIMEZONE TO 'America/Chicago';

/****
This example tests not only using the queue_table timestamp to build a date-based table,
but doing that in a different time zone than we might expect, just to show the functionality.

So we are going to show a history table of customers from the perspective of the UK
 */
CREATE TABLE test_fact.customers_history_uktime_fact (customer_id INT, as_of_date DATERANGE, customer_number text, phone TEXT, age INT, PRIMARY KEY (customer_id, as_of_date));
CREATE FUNCTION test_fact.customers_history_uktime_fact_merge(p_customer_id INT, p_as_of_date DATE)
RETURNS VOID AS
$BODY$
BEGIN

  WITH it_really_changed AS (
  SELECT customer_id, daterange(p_as_of_date, 'infinity') AS as_of_date, customer_number, phone, age FROM test.customers WHERE customer_id = p_customer_id
  EXCEPT
  SELECT customer_id, as_of_date, customer_number, phone, age FROM test_fact.customers_history_uktime_fact
  WHERE customer_id = p_customer_id
    AND upper(as_of_date) = 'infinity'
  )

  , ended_last_fact AS
  (UPDATE test_fact.customers_history_uktime_fact f
      SET as_of_date = daterange(lower(f.as_of_date), lower(irc.as_of_date))
   FROM it_really_changed irc
    WHERE f.customer_id = irc.customer_id
      AND lower(f.as_of_date) <> lower(irc.as_of_date)
      AND upper(f.as_of_date) = 'infinity'
    RETURNING *)

  INSERT INTO test_fact.customers_history_uktime_fact AS f
    (customer_id, as_of_date, customer_number, phone, age)
  SELECT
    customer_id,
    as_of_date,
    customer_number,
    phone,
    age
  FROM it_really_changed nes
  ON CONFLICT (customer_id, as_of_date)
  DO UPDATE
  SET
    customer_number = EXCLUDED.customer_number
    , phone = EXCLUDED.phone
    , age = EXCLUDED.age;

END;
$BODY$
LANGUAGE plpgsql;

CREATE FUNCTION test_fact.customers_history_uktime_fact_delete(p_customer_id INT)
RETURNS VOID AS
$BODY$
BEGIN

DELETE FROM test_fact.customers_history_uktime_fact WHERE customer_id = p_customer_id;

END;
$BODY$
LANGUAGE plpgsql;

INSERT INTO fact_loader.fact_tables
(fact_table_relid, priority)
VALUES ('test_fact.customers_history_uktime_fact'::REGCLASS, 8);

WITH queue_tables_with_proids AS (
  SELECT
    *,
    'test_fact.customers_history_uktime_fact_merge'::REGPROC AS insert_merge_proid,
    'test_fact.customers_history_uktime_fact_merge'::REGPROC AS update_merge_proid,
    'test_fact.customers_history_uktime_fact_delete'::REGPROC AS delete_merge_proid
  FROM fact_loader.queue_tables
  WHERE queue_of_base_table_relid IN
    /***
    These are the tables that are involved in test_fact.customers_fact_aggregator
    Find this out for each function in order to properly configure all possible changes
    that could affect the tables
     */
        ('test.customers'::REGCLASS)
  )

  INSERT INTO fact_loader.queue_table_deps
  (fact_table_id, queue_table_id, insert_merge_proid, update_merge_proid, delete_merge_proid)
  SELECT
    fact_table_id, queue_tables_with_proids.queue_table_id, insert_merge_proid, update_merge_proid, delete_merge_proid
  FROM fact_loader.fact_tables
  CROSS JOIN queue_tables_with_proids
  WHERE fact_table_relid = 'test_fact.customers_history_uktime_fact'::REGCLASS;

  --Key retrieval for updates
  INSERT INTO fact_loader.key_retrieval_sequences (
    queue_table_dep_id,
    filter_scope,
    level,
    return_columns,
    is_fact_key,
    join_to_relation,
    join_to_column,
    return_columns_from_join,
    join_return_is_fact_key,
    pass_queue_table_change_date_at_tz)
  SELECT
    queue_table_dep_id,
    evts.evt,
    1,
    '{customer_id}'::name[],
    true,
    null,
    null,
    null::name[],
    null::boolean,
    --THIS is the key of which time zone the date is seen from
    'Europe/London'::TEXT
  FROM fact_loader.queue_table_deps qtd
  INNER JOIN fact_loader.queue_tables qt USING (queue_table_id)
  INNER JOIN fact_loader.fact_tables ft USING (fact_table_id)
  CROSS JOIN (VALUES ('I'),('U')) evts (evt)
  WHERE fact_table_relid = 'test_fact.customers_history_uktime_fact'::REGCLASS
    AND queue_of_base_table_relid IN('test.customers'::REGCLASS)
  UNION ALL
  SELECT
    queue_table_dep_id,
    'D',
    1,
    '{customer_id}'::name[],
    true,
    null,
    null,
    null::name[],
    null::boolean,
    null::TEXT
  FROM fact_loader.queue_table_deps qtd
  INNER JOIN fact_loader.queue_tables qt USING (queue_table_id)
  INNER JOIN fact_loader.fact_tables ft USING (fact_table_id)
  WHERE fact_table_relid = 'test_fact.customers_history_uktime_fact'::REGCLASS
    AND queue_of_base_table_relid IN('test.customers'::REGCLASS);

SELECT test_fact.customers_history_uktime_fact_merge(customer_id, '2018-04-22'::DATE)
FROM test.customers;

UPDATE test.customers SET customer_number = customer_number||'a' WHERE customer_id BETWEEN 1 AND 5;
UPDATE test.customers SET customer_number = customer_number||'b' WHERE customer_id BETWEEN 1 AND 5;
UPDATE test.customers SET customer_number = customer_number||'c' WHERE customer_id BETWEEN 6 AND 10;
UPDATE test.customers SET customer_number = customer_number||'d' WHERE customer_id BETWEEN 6 AND 10;
UPDATE test.customers SET customer_number = customer_number||'e' WHERE customer_id BETWEEN 1 AND 5;

/****
Now we have to mock that this actually happened on different days.
 */
UPDATE test_audit_raw.customers_audit SET changed_at = '2018-04-24'::DATE WHERE change ->> 'customer_number' ~ '1a$';
UPDATE test_audit_raw.customers_audit SET changed_at = '2018-04-24'::DATE WHERE change ->> 'customer_number' ~ '1ab$';
UPDATE test_audit_raw.customers_audit SET changed_at = '2018-04-25'::DATE WHERE change ->> 'customer_number' ~ '1c$';
UPDATE test_audit_raw.customers_audit SET changed_at = '2018-04-26'::DATE WHERE change ->> 'customer_number' ~ '1cd$';
UPDATE test_audit_raw.customers_audit SET changed_at = '2018-04-27'::DATE WHERE change ->> 'customer_number' ~ '1abe$';

--Ensure this one table is prioritized
UPDATE fact_loader.fact_tables SET force_worker_priority = TRUE, enabled = TRUE WHERE fact_table_relid = 'test_fact.customers_history_uktime_fact'::REGCLASS;

SELECT test.tick();

DO $$
BEGIN
    IF NOT (SELECT COUNT(1) FROM fact_loader.gathered_queued_changes((SELECT fact_table_id FROM fact_loader.fact_tables WHERE fact_table_relid = 'test_fact.customers_history_uktime_fact'::REGCLASS))) = 18 THEN
        RAISE EXCEPTION '%', 'No worky';
    END IF;
END$$;

SELECT fact_loader.worker();

SELECT * FROM test_fact.customers_history_uktime_fact ORDER BY upper(as_of_date), customer_id;

--Let's verify the current records are the same as the actual table
SELECT customer_id, customer_number, phone, age FROM test.customers
INTERSECT
SELECT customer_id, customer_number, phone, age FROM test_fact.customers_history_uktime_fact
WHERE upper(as_of_date) = 'infinity'
ORDER BY customer_id;

UPDATE fact_loader.fact_tables SET force_worker_priority = FALSE WHERE fact_table_relid = 'test_fact.customers_history_uktime_fact'::REGCLASS;