File: 05_pgl_setup.sql

package info (click to toggle)
pg-fact-loader 2.0.1-5
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 1,884 kB
  • sloc: sql: 28,911; sh: 157; makefile: 26
file content (178 lines) | stat: -rw-r--r-- 5,791 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
\set d `echo ${TESTDRIVER:-pglogical}`
\set x `echo ${TESTDROPEXT:-false}`
SET client_min_messages TO warning;
--This is for testing functionality of timezone-specific timestamps
SET TIMEZONE TO 'America/Chicago';

SELECT pglogical.create_node('test','host=localhost') INTO TEMP foonode;
DROP TABLE foonode;

WITH sets AS (
SELECT 'test'||generate_series AS set_name
FROM generate_series(1,1)
)

SELECT pglogical.create_replication_set
(set_name:=s.set_name
,replicate_insert:=TRUE
,replicate_update:=TRUE
,replicate_delete:=TRUE
,replicate_truncate:=TRUE) AS result
INTO TEMP repsets
FROM sets s
WHERE NOT EXISTS (
SELECT 1
FROM pglogical.replication_set
WHERE set_name = s.set_name);

DROP TABLE repsets;

-- native equivalent
CREATE PUBLICATION test1 WITH (publish = 'insert,update,delete');

SELECT pglogical_ticker.deploy_ticker_tables();

-- native equivalent
CREATE SCHEMA IF NOT EXISTS logical_ticker;
CREATE TABLE IF NOT EXISTS logical_ticker.tick (
    db text DEFAULT current_database() NOT NULL PRIMARY KEY,
    tick_time TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
    tier SMALLINT DEFAULT 1 NULL
);

--As of pglogical_ticker 1.2, we don't tick tables not in replication uselessly, but this
--would break our tests which did exactly that.  So we can fix the test breakage by just adding these tables
--to replication as they would be on an actual provider
SELECT pglogical_ticker.add_ticker_tables_to_replication();
--The tests will manually run tick() before new data is needed

-- native equivalent
ALTER PUBLICATION test1 ADD TABLE logical_ticker.tick;

CREATE TEMP TABLE vars AS SELECT :'d'::text as driver, :'x'::boolean as drop_ext;
DO $$
DECLARE v_record RECORD;
BEGIN

IF (SELECT driver FROM vars) = 'native' THEN
    FOR v_record IN
        SELECT schemaname, tablename
        FROM pg_tables
        WHERE schemaname IN('test', 'test_audit_raw')
    LOOP
        EXECUTE format('ALTER PUBLICATION test1 ADD TABLE %s.%s', v_record.schemaname, v_record.tablename);
    END LOOP;
    CREATE OR REPLACE FUNCTION test.tick() RETURNS VOID AS $BODY$
    BEGIN
    INSERT INTO logical_ticker.tick (tick_time) VALUES (now()) ON CONFLICT (db) DO UPDATE SET tick_time = now();
    END;$BODY$
    LANGUAGE plpgsql;
    
    CREATE TABLE public.mock_pg_subscription (
        oid oid NOT NULL,
        subdbid oid NOT NULL,
        subname name NOT NULL,
        subowner oid NOT NULL,
        subenabled boolean NOT NULL,
        subconninfo text NOT NULL,
        subslotname name NOT NULL,
        subsynccommit text NOT NULL,
        subpublications text[] NOT NULL
    );
    INSERT INTO mock_pg_subscription (oid, subdbid, subname, subowner, subenabled, subconninfo, subslotname, subsynccommit, subpublications)
    VALUES (10000, (SELECT oid FROM pg_database WHERE datname = current_database()), 'test', 16384, true, 'host=example.com dbname=contrib_regression', 'test', 'off', '{test1}');

    CREATE OR REPLACE FUNCTION fact_loader.subscription()
    RETURNS TABLE (oid OID, subpublications text[], subconninfo text)
    AS $BODY$
    BEGIN
    
    RETURN QUERY
    SELECT s.oid, s.subpublications, s.subconninfo FROM mock_pg_subscription s;
    
    END;
    $BODY$
    LANGUAGE plpgsql;
    
    CREATE TABLE public.mock_pg_subscription_rel (
        srsubid oid NOT NULL,
        srrelid oid NOT NULL,
        srsubstate "char" NOT NULL,
        srsublsn pg_lsn NOT NULL
    );
    INSERT INTO mock_pg_subscription_rel (srsubid, srrelid, srsubstate, srsublsn)
    SELECT (SELECT oid FROM mock_pg_subscription LIMIT 1), c.oid, 'r', '0/0' 
    FROM pg_class c
    JOIN pg_namespace n ON n.oid = c.relnamespace
    WHERE n.nspname IN('test', 'test_audit_raw') AND c.relkind = 'r';

    CREATE OR REPLACE FUNCTION fact_loader.subscription_rel()
    RETURNS TABLE (srsubid OID, srrelid OID)
    AS $BODY$
    BEGIN
    
    RETURN QUERY
    SELECT sr.srsubid, sr.srrelid FROM mock_pg_subscription_rel sr;
    
    END;
    $BODY$
    LANGUAGE plpgsql;

    IF (SELECT drop_ext FROM vars) THEN
        DROP EXTENSION pglogical CASCADE;
    END IF;
    
ELSE
    UPDATE fact_loader.queue_tables SET pglogical_node_if_id = (SELECT if_id FROM pglogical.node_interface);
    CREATE OR REPLACE FUNCTION test.tick() RETURNS VOID AS $BODY$
    BEGIN
    PERFORM pglogical_ticker.tick();
    END;$BODY$
    LANGUAGE plpgsql;
END IF;

END$$;


/***
Mock this function so that we find results locally
 */
CREATE OR REPLACE FUNCTION pglogical_ticker.all_subscription_tickers()
RETURNS TABLE (provider_name NAME, set_name NAME, source_time TIMESTAMPTZ)
AS
$BODY$
BEGIN

RETURN QUERY SELECT t.provider_name, 'test1'::NAME AS set_name, t.source_time FROM pglogical_ticker.test1 t;

END;
$BODY$
LANGUAGE plpgsql;

/***
Mock so we get what we want here also
 */
    CREATE OR REPLACE FUNCTION fact_loader.logical_subscription()
    RETURNS TABLE (subid OID, subpublications text[], subconninfo text, dbname text, driver fact_loader.driver)
    AS $BODY$
    BEGIN

    IF EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pglogical') THEN

      RETURN QUERY EXECUTE $$
      SELECT if_id AS subid, '{test1}'::text[] as subpublications, null::text AS subconninfo, null::text AS dbname, 'pglogical'::fact_loader.driver AS driver
      FROM pglogical.node_interface
      UNION ALL
      SELECT s.oid, s.subpublications, s.subconninfo, (regexp_matches(s.subconninfo, 'dbname=(.*?)(?=\s|$)'))[1] AS dbname, 'native'::fact_loader.driver AS driver
      FROM fact_loader.subscription() s;
      $$;
    ELSE
      RETURN QUERY
      SELECT s.oid, s.subpublications, s.subconninfo, (regexp_matches(s.subconninfo, 'dbname=(.*?)(?=\s|$)'))[1] AS dbname, 'native'::fact_loader.driver AS driver
      FROM fact_loader.subscription() s;

    END IF;

    END;
    $BODY$
    LANGUAGE plpgsql;