File: manage_dest_table.sql

package info (click to toggle)
mimeo 1.5.1-20
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,300 kB
  • sloc: sql: 85,916; python: 81; makefile: 23; sh: 16
file content (175 lines) | stat: -rw-r--r-- 6,970 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/*
 * Manages creating destination table and/or returning data about the columns.
 * p_snap parameter is passed if snap table is being managed. Should be equal to either _snap1 or _snap2.
 */ 
CREATE FUNCTION manage_dest_table (
    p_destination text
    , p_snap text
    , p_dblink_name text DEFAULT NULL
    , p_debug boolean DEFAULT false
    , OUT p_table_exists boolean
    , OUT p_cols text[]
    , OUT p_cols_n_types text[]
    , OUT p_source_schema_name text
    , OUT p_source_table_name text) 
RETURNS record
    LANGUAGE plpgsql SECURITY DEFINER
    AS $$
DECLARE

v_add_logdel_col    text;
v_col_exists        int;
v_condition         text;
v_create_sql        text; 
v_dblink            int;
v_dblink_name       text;
v_dblink_schema     text; 
v_dest_schema_name  text;
v_dest_table        text;
v_dest_table_name   text;
v_filter            text[];
v_link_exists       boolean;
v_remote_sql        text; 
v_old_search_path   text;
v_source_table      text;
v_type              text;

BEGIN

-- Allow re-use of existing remote connection to avoid load of creating another
IF p_dblink_name IS NULL THEN
    v_dblink_name := @extschema@.check_name_length('manage_dest_table_dblink_'||p_destination);
ELSE
    v_dblink_name := p_dblink_name;
END IF;

SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid;

SELECT current_setting('search_path') INTO v_old_search_path;
EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_dblink_schema||',public'',''false'')';

SELECT dest_table
    , type
    , dblink
    , filter
    , condition
INTO v_dest_table
    , v_type
    , v_dblink
    , v_filter
    , v_condition
FROM refresh_config
WHERE dest_table = p_destination; 
IF NOT FOUND THEN
   RAISE EXCEPTION 'ERROR: This table is not set up for replication: %', p_destination; 
END IF;

EXECUTE 'SELECT source_table FROM refresh_config_'||v_type||' WHERE dest_table = '||quote_literal(v_dest_table) INTO v_source_table;

IF p_snap IS NOT NULL AND p_snap NOT IN ('snap1', 'snap2') THEN
    RAISE EXCEPTION 'Invalid value for p_snap parameter given to manage_dest_table() function';
END IF;

IF position('.' in v_dest_table) > 0 THEN
    v_dest_schema_name := split_part(v_dest_table, '.', 1); 
    v_dest_table_name := split_part(v_dest_table, '.', 2);
ELSE
    RAISE EXCEPTION 'Destination table set in refresh_config table must be schema qualified. Error in manage_dest_table() call.';
END IF;

IF p_dblink_name IS NULL THEN
    PERFORM dblink_connect(v_dblink_name, @extschema@.auth(v_dblink));
END IF;

SELECT schemaname, tablename INTO p_source_schema_name, p_source_table_name 
    FROM dblink(v_dblink_name, format('
            SELECT schemaname, tablename
            FROM (
                SELECT schemaname, tablename 
                FROM pg_catalog.pg_tables 
                WHERE schemaname ||''.''|| tablename = %L
                UNION
                SELECT schemaname, viewname AS tablename
                FROM pg_catalog.pg_views
                WHERE schemaname || ''.'' || viewname = %L
            ) tables LIMIT 1'
        , v_source_table, v_source_table) )
    t (schemaname text, tablename text);

IF p_source_schema_name IS NULL OR p_source_table_name IS NULL THEN
    RAISE EXCEPTION 'Source table % not found for dblink id %', v_source_table, v_dblink;
END IF;

-- Always return source column info in case extra columns were added to destination. Source columns should not be changed before destination.
-- Double-quotes are added around column names to account for mixed case, special chars or reserved words
v_remote_sql := 'SELECT array_agg(''"''||attname||''"'') as cols
    , array_agg(''"''||attname||''"''||'' ''||format_type(atttypid, atttypmod)::text) as cols_n_types 
    FROM pg_attribute 
    WHERE attrelid = '''||quote_ident(p_source_schema_name)||'.'||quote_ident(p_source_table_name)||'''::regclass 
    AND attnum > 0 
    AND attisdropped is false';
PERFORM gdb(p_debug,'v_remote_sql: '||v_remote_sql);
IF v_filter IS NOT NULL THEN -- Apply column filters if used
    v_remote_sql := v_remote_sql || ' AND ARRAY[attname::text] <@ '||quote_literal(v_filter);
END IF;
v_remote_sql := 'SELECT cols, cols_n_types FROM dblink('||quote_literal(v_dblink_name)||', ' || quote_literal(v_remote_sql) || ') t (cols text[], cols_n_types text[])';
PERFORM gdb(p_debug,'v_remote_sql: '||v_remote_sql);
EXECUTE v_remote_sql INTO p_cols, p_cols_n_types;  
PERFORM gdb(p_debug,'p_cols: {'|| array_to_string(p_cols, ',') ||'}');
PERFORM gdb(p_debug,'p_cols_n_types: {'|| array_to_string(p_cols_n_types, ',') ||'}');

IF p_cols IS NULL OR p_cols_n_types IS NULL THEN
    RAISE EXCEPTION 'Retrieval of source column schema returned NULL. Possible causes are an invalid column filter list.';
END IF;

SELECT 
    CASE    
        WHEN count(1) > 0 THEN true
        ELSE false 
    END
INTO p_table_exists FROM pg_catalog.pg_tables WHERE schemaname ||'.'|| tablename = v_dest_table || COALESCE('_'||p_snap, '');
IF p_table_exists = false THEN
    v_create_sql := format('CREATE TABLE %I.%I', v_dest_schema_name, v_dest_table_name || COALESCE('_'||p_snap, ''));
    v_create_sql := v_create_sql || ' (' || array_to_string(p_cols_n_types, ',') || ')';
    perform gdb(p_debug,'v_create_sql: '||v_create_sql::text);
    EXECUTE v_create_sql;

    IF v_type = 'logdel' THEN
        SELECT count(*) INTO v_col_exists 
        FROM pg_catalog.pg_attribute a
        JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
        JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
        WHERE n.nspname = v_dest_schema_name
        AND c.relname = v_dest_table_name
        AND a.attname = 'mimeo_source_deleted' 
        AND a.attisdropped = false;
        IF v_col_exists < 1 THEN
            v_add_logdel_col := format('ALTER TABLE %I.%I ADD COLUMN mimeo_source_deleted timestamptz', v_dest_schema_name, v_dest_table_name || COALESCE('_'||p_snap, ''));
            PERFORM gdb(p_debug, 'v_add_logdel_col : ' || v_add_logdel_col);
            EXECUTE v_add_logdel_col;
        ELSE
            RAISE WARNING 'Special column (mimeo_source_deleted) already exists on destination table (%)', v_dest_table;
        END IF;
    END IF;

END IF;

-- Only close link if dblink name wasn't passed in as parameter
IF p_dblink_name IS NULL THEN
    PERFORM dblink_disconnect(v_dblink_name);
END IF;

EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';

EXCEPTION
    WHEN QUERY_CANCELED OR OTHERS THEN
        EXECUTE 'SELECT '||v_dblink_schema||'.dblink_get_connections() @> ARRAY['||quote_literal(v_dblink_name)||']' INTO v_link_exists;
        IF v_link_exists THEN
            EXECUTE 'SELECT '||v_dblink_schema||'.dblink_disconnect('||quote_literal(v_dblink_name)||')';
        END IF;
        EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
        RAISE EXCEPTION '%', SQLERRM;   
END
$$;