File: create_index.sql

package info (click to toggle)
mimeo 1.5.1-20
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,300 kB
  • sloc: sql: 85,916; python: 81; makefile: 23; sh: 16
file content (139 lines) | stat: -rw-r--r-- 7,043 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/*
 * Create index(es) on destination table
 */
CREATE FUNCTION create_index(p_destination text, p_source_schema_name text, p_source_table_name text, p_snap text DEFAULT NULL, p_debug boolean DEFAULT false) RETURNS void
    LANGUAGE plpgsql SECURITY DEFINER
    AS $$
DECLARE

v_conf              text;
v_dblink            int;
v_dblink_name       text;
v_dblink_schema     text;
v_dest_schema_name  text;
v_dest_table        text;
v_dest_table_name   text;
v_filter            text;
v_link_exists       boolean;
v_old_search_path   text;
v_repl_index        oid;
v_remote_index_sql  text;
v_row               record;
v_source_table      text;
v_statement         text;
v_type              text;

BEGIN

v_dblink_name := @extschema@.check_name_length('create_index_dblink_'||p_destination);
SELECT nspname INTO v_dblink_schema FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid;
SELECT current_setting('search_path') INTO v_old_search_path;
EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_dblink_schema||''',''false'')';

SELECT dest_table
    , type
    , dblink
    , filter
INTO v_dest_table
    , v_type
    , v_dblink
    , v_filter
FROM refresh_config
WHERE dest_table = p_destination; 
IF NOT FOUND THEN
   RAISE EXCEPTION 'ERROR: This table is not set up for replication: %', p_destination; 
END IF;

EXECUTE 'SELECT source_table FROM refresh_config_'||v_type||' WHERE dest_table = '||quote_literal(v_dest_table) INTO v_source_table;

IF p_snap IS NOT NULL AND p_snap NOT IN ('snap1', 'snap2') THEN
    RAISE EXCEPTION 'Invalid value for p_snap parameter given to create_index() function';
END IF;

PERFORM dblink_connect(v_dblink_name, @extschema@.auth(v_dblink));
-- Reset search_path on remote connection to ensure schema is included in table name in index creation statement
-- set_config returns a record value, so can't just use dblink_exec
SELECT set_config INTO v_conf FROM dblink(v_dblink_name, 'SELECT set_config(''search_path'', '''', false)::text') t (set_config text);

SELECT schemaname, tablename INTO v_dest_schema_name, v_dest_table_name FROM pg_catalog.pg_tables WHERE schemaname||'.'||tablename = v_dest_table||COALESCE('_'||p_snap, '');

-- Gets primary key or unique index used by updater/dml/logdel replication (same function is called in their makers). 
-- Should only loop once, but just easier to keep code consistent with below method
FOR v_row IN SELECT indexrelid, key_type, indkey_names, statement FROM fetch_replication_key(p_source_schema_name, p_source_table_name, v_dblink_name, p_debug)
LOOP

    EXIT WHEN v_row.indexrelid IS NULL; -- function still returns a row full of nulls when nothing found

    IF v_row.key_type = 'primary' THEN
        v_statement := format('ALTER TABLE %I.%I ADD CONSTRAINT %I PRIMARY KEY ("'||array_to_string(v_row.indkey_names, '","')||'")'
            , v_dest_schema_name, v_dest_table_name, v_dest_table_name ||'_'||array_to_string(v_row.indkey_names, '_')||'_pk');
        PERFORM gdb(p_debug, 'primary key statement: '||v_statement);
    ELSIF v_row.key_type = 'unique' THEN
        v_statement := v_row.statement;
        -- Replace source table name with destination
        v_statement := regexp_replace(
            v_statement
            , ' ON "?'||p_source_schema_name||'"?."?'||p_source_table_name||'"?'
            , ' ON '||quote_ident(v_dest_schema_name)||'.'||quote_ident(v_dest_table_name));
        -- If source index name contains the table name, replace it with the destination table.
        v_statement := regexp_replace(v_statement, '(INDEX \w*)'||p_source_table_name||'(\w* ON)', '\1'||v_dest_table_name||'\2');
        -- If it's a snap table, prepend to ensure unique index name (may cause snap1/2 to be in index name twice, but too complicated to fix that.
        -- This is done separately from above replace because it must always be done even if the index name doesn't contain the source table
        IF p_snap IS NOT NULL THEN
            v_statement := regexp_replace(v_statement, 'UNIQUE INDEX ("?)', 'UNIQUE INDEX \1'||p_snap||'_');
        END IF;
        PERFORM gdb(p_debug, 'unique key statement: ' || v_statement);
    END IF;
    EXECUTE v_statement;
    v_repl_index = v_row.indexrelid;
END LOOP;

-- Get all indexes other than one obtained above. 
-- Cannot set these indexes when column filters are in use because there's no easy way to check columns in expression indexes.
IF v_filter IS NULL THEN
    v_remote_index_sql := 'select c.relname AS src_table, pg_get_indexdef(i.indexrelid) as statement
        FROM pg_catalog.pg_index i
        JOIN pg_catalog.pg_class c ON i.indrelid = c.oid 
        JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
        WHERE n.nspname = '||quote_literal(p_source_schema_name)||'
        AND c.relname = '||quote_literal(p_source_table_name)||'
        AND i.indisprimary IS false
        AND i.indisvalid';
    IF v_repl_index IS NOT NULL THEN
        v_remote_index_sql := v_remote_index_sql ||' AND i.indexrelid <> '||v_repl_index;
    END IF;

    FOR v_row IN EXECUTE 'SELECT src_table, statement FROM dblink('||quote_literal(v_dblink_name)||', '||quote_literal(v_remote_index_sql)||') t (src_table text, statement text)' LOOP
        v_statement := v_row.statement;
        -- Replace source table name with destination
        v_statement := regexp_replace(
            v_statement
            , ' ON "?'||p_source_schema_name||'"?."?'||p_source_table_name||'"?'
            , ' ON '||quote_ident(v_dest_schema_name)||'.'||quote_ident(v_dest_table_name));
        -- If source index name contains the table name, replace it with the destination table.
        v_statement := regexp_replace(v_statement, '(INDEX \w*)'||p_source_table_name||'(\w* ON)', '\1'||v_dest_table_name||'\2');
        -- If it's a snap table, prepend to ensure unique index name (may cause snap1/2 to be in index name twice, but too complicated to fix that.
        -- This is done separately from above replace because it must always be done even if the index name doesn't contain the source table
        IF p_snap IS NOT NULL THEN
            v_statement := regexp_replace(v_statement, 'E INDEX ("?)', 'E INDEX \1'||p_snap||'_');
        END IF;
        PERFORM gdb(p_debug, 'normal index statement: ' || v_statement);
        EXECUTE v_statement;
    END LOOP;
END IF;

PERFORM dblink_disconnect(v_dblink_name);

EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';

EXCEPTION
    WHEN QUERY_CANCELED OR OTHERS THEN
        SELECT nspname INTO v_dblink_schema FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid;
        EXECUTE format('SELECT %I.dblink_get_connections() @> ARRAY[%L]', v_dblink_schema, v_dblink_name) INTO v_link_exists;
        IF v_link_exists THEN
            EXECUTE format('SELECT %I.dblink_disconnect(%L)', v_dblink_schema, v_dblink_name);
        END IF;
        RAISE EXCEPTION '%', SQLERRM; 
END
$$;