File: queue_table_delay_info.sql

package info (click to toggle)
pg-fact-loader 2.0.1-5
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 1,884 kB
  • sloc: sql: 28,911; sh: 157; makefile: 26
file content (82 lines) | stat: -rw-r--r-- 3,046 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
CREATE OR REPLACE FUNCTION fact_loader.queue_table_delay_info()
RETURNS TABLE("publication_name" text,
           "queue_of_base_table_relid" regclass,
           "publisher" name,
           "source_time" timestamp with time zone)
AS
$BODY$
/***
This function exists to allow no necessary dependency
to exist on pglogical_ticker.  If the extension is used,
it will return data from its native functions, if not,
it will return a null data set matching the structure
***/
BEGIN

IF EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pglogical_ticker') THEN
    RETURN QUERY EXECUTE $$
    -- pglogical
    SELECT
        unnest(coalesce(subpublications,'{NULL}')) AS publication_name
      , qt.queue_of_base_table_relid
      , n.if_name AS publisher
      , t.source_time
    FROM fact_loader.queue_tables qt
      JOIN fact_loader.logical_subscription() s ON qt.pglogical_node_if_id = s.subid AND s.driver = 'pglogical'
      JOIN pglogical.node_interface n ON n.if_id = qt.pglogical_node_if_id
      JOIN pglogical_ticker.all_subscription_tickers() t ON t.provider_name = n.if_name
    UNION ALL
    -- native logical
    SELECT
        unnest(coalesce(subpublications,'{NULL}')) AS publication_name
      , qt.queue_of_base_table_relid
      , t.db AS publisher
      , t.tick_time AS source_time
    FROM fact_loader.queue_tables qt
      JOIN fact_loader.subscription_rel() psr ON psr.srrelid = qt.queue_table_relid
      JOIN fact_loader.logical_subscription() s ON psr.srsubid = s.subid
      JOIN logical_ticker.tick t ON t.db = s.dbname
    UNION ALL
    -- local
    SELECT
        NULL::text AS publication_name
      , qt.queue_of_base_table_relid
      , NULL::name AS publisher
      , now() AS source_time
    FROM fact_loader.queue_tables qt
    WHERE qt.pglogical_node_if_id IS NULL
        AND NOT EXISTS (
        SELECT 1
        FROM fact_loader.subscription_rel() psr WHERE psr.srrelid = qt.queue_table_relid
    );$$;
ELSE
    RETURN QUERY
    -- local
    SELECT
        NULL::TEXT AS publication_name
      , qt.queue_of_base_table_relid
      , NULL::NAME AS publisher
      --source_time is now() if queue tables are not pglogical-replicated, which is assumed if no ticker
      , now() AS source_time
    FROM fact_loader.queue_tables qt
    WHERE NOT EXISTS (SELECT 1 FROM fact_loader.subscription_rel() psr WHERE psr.srrelid = qt.queue_table_relid)
    UNION ALL
    -- native logical
    (WITH logical_subscription_with_db AS (
    SELECT *, (regexp_matches(subconninfo, 'dbname=(.*?)(?=\s|$)'))[1] AS db
    FROM fact_loader.logical_subscription()
    )
    SELECT
        unnest(coalesce(subpublications,'{NULL}')) AS publication_name
      , qt.queue_of_base_table_relid
      , t.db AS publisher
      , t.tick_time AS source_time
    FROM fact_loader.queue_tables qt
      JOIN fact_loader.subscription_rel() psr ON psr.srrelid = qt.queue_table_relid
      JOIN logical_subscription_with_db s ON psr.srsubid = s.subid
      JOIN logical_ticker.tick t ON t.db = s.db);
END IF;

END;
$BODY$
LANGUAGE plpgsql;