1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
|
/*
* Manages creating destination table and/or returning data about the columns.
* p_snap parameter is passed if snap table is being managed. Should be equal to either _snap1 or _snap2.
*/
CREATE FUNCTION manage_dest_table (
p_destination text
, p_snap text
, p_dblink_name text DEFAULT NULL
, p_debug boolean DEFAULT false
, OUT p_table_exists boolean
, OUT p_cols text[]
, OUT p_cols_n_types text[]
, OUT p_source_schema_name text
, OUT p_source_table_name text)
RETURNS record
LANGUAGE plpgsql SECURITY DEFINER
AS $$
DECLARE
v_add_logdel_col text;
v_col_exists int;
v_condition text;
v_create_sql text;
v_dblink int;
v_dblink_name text;
v_dblink_schema text;
v_dest_schema_name text;
v_dest_table text;
v_dest_table_name text;
v_filter text[];
v_link_exists boolean;
v_remote_sql text;
v_old_search_path text;
v_source_table text;
v_type text;
BEGIN
-- Allow re-use of existing remote connection to avoid load of creating another
IF p_dblink_name IS NULL THEN
v_dblink_name := @extschema@.check_name_length('manage_dest_table_dblink_'||p_destination);
ELSE
v_dblink_name := p_dblink_name;
END IF;
SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid;
SELECT current_setting('search_path') INTO v_old_search_path;
EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_dblink_schema||',public'',''false'')';
SELECT dest_table
, type
, dblink
, filter
, condition
INTO v_dest_table
, v_type
, v_dblink
, v_filter
, v_condition
FROM refresh_config
WHERE dest_table = p_destination;
IF NOT FOUND THEN
RAISE EXCEPTION 'ERROR: This table is not set up for replication: %', p_destination;
END IF;
EXECUTE 'SELECT source_table FROM refresh_config_'||v_type||' WHERE dest_table = '||quote_literal(v_dest_table) INTO v_source_table;
IF p_snap IS NOT NULL AND p_snap NOT IN ('snap1', 'snap2') THEN
RAISE EXCEPTION 'Invalid value for p_snap parameter given to manage_dest_table() function';
END IF;
IF position('.' in v_dest_table) > 0 THEN
v_dest_schema_name := split_part(v_dest_table, '.', 1);
v_dest_table_name := split_part(v_dest_table, '.', 2);
ELSE
RAISE EXCEPTION 'Destination table set in refresh_config table must be schema qualified. Error in manage_dest_table() call.';
END IF;
IF p_dblink_name IS NULL THEN
PERFORM dblink_connect(v_dblink_name, @extschema@.auth(v_dblink));
END IF;
SELECT schemaname, tablename INTO p_source_schema_name, p_source_table_name
FROM dblink(v_dblink_name, format('
SELECT schemaname, tablename
FROM (
SELECT schemaname, tablename
FROM pg_catalog.pg_tables
WHERE schemaname ||''.''|| tablename = %L
UNION
SELECT schemaname, viewname AS tablename
FROM pg_catalog.pg_views
WHERE schemaname || ''.'' || viewname = %L
) tables LIMIT 1'
, v_source_table, v_source_table) )
t (schemaname text, tablename text);
IF p_source_schema_name IS NULL OR p_source_table_name IS NULL THEN
RAISE EXCEPTION 'Source table % not found for dblink id %', v_source_table, v_dblink;
END IF;
-- Always return source column info in case extra columns were added to destination. Source columns should not be changed before destination.
-- Double-quotes are added around column names to account for mixed case, special chars or reserved words
v_remote_sql := 'SELECT array_agg(''"''||attname||''"'') as cols
, array_agg(''"''||attname||''"''||'' ''||format_type(atttypid, atttypmod)::text) as cols_n_types
FROM pg_attribute
WHERE attrelid = '''||quote_ident(p_source_schema_name)||'.'||quote_ident(p_source_table_name)||'''::regclass
AND attnum > 0
AND attisdropped is false';
PERFORM gdb(p_debug,'v_remote_sql: '||v_remote_sql);
IF v_filter IS NOT NULL THEN -- Apply column filters if used
v_remote_sql := v_remote_sql || ' AND ARRAY[attname::text] <@ '||quote_literal(v_filter);
END IF;
v_remote_sql := 'SELECT cols, cols_n_types FROM dblink('||quote_literal(v_dblink_name)||', ' || quote_literal(v_remote_sql) || ') t (cols text[], cols_n_types text[])';
PERFORM gdb(p_debug,'v_remote_sql: '||v_remote_sql);
EXECUTE v_remote_sql INTO p_cols, p_cols_n_types;
PERFORM gdb(p_debug,'p_cols: {'|| array_to_string(p_cols, ',') ||'}');
PERFORM gdb(p_debug,'p_cols_n_types: {'|| array_to_string(p_cols_n_types, ',') ||'}');
IF p_cols IS NULL OR p_cols_n_types IS NULL THEN
RAISE EXCEPTION 'Retrieval of source column schema returned NULL. Possible causes are an invalid column filter list.';
END IF;
SELECT
CASE
WHEN count(1) > 0 THEN true
ELSE false
END
INTO p_table_exists FROM pg_catalog.pg_tables WHERE schemaname ||'.'|| tablename = v_dest_table || COALESCE('_'||p_snap, '');
IF p_table_exists = false THEN
v_create_sql := format('CREATE TABLE %I.%I', v_dest_schema_name, v_dest_table_name || COALESCE('_'||p_snap, ''));
v_create_sql := v_create_sql || ' (' || array_to_string(p_cols_n_types, ',') || ')';
perform gdb(p_debug,'v_create_sql: '||v_create_sql::text);
EXECUTE v_create_sql;
IF v_type = 'logdel' THEN
SELECT count(*) INTO v_col_exists
FROM pg_catalog.pg_attribute a
JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = v_dest_schema_name
AND c.relname = v_dest_table_name
AND a.attname = 'mimeo_source_deleted'
AND a.attisdropped = false;
IF v_col_exists < 1 THEN
v_add_logdel_col := format('ALTER TABLE %I.%I ADD COLUMN mimeo_source_deleted timestamptz', v_dest_schema_name, v_dest_table_name || COALESCE('_'||p_snap, ''));
PERFORM gdb(p_debug, 'v_add_logdel_col : ' || v_add_logdel_col);
EXECUTE v_add_logdel_col;
ELSE
RAISE WARNING 'Special column (mimeo_source_deleted) already exists on destination table (%)', v_dest_table;
END IF;
END IF;
END IF;
-- Only close link if dblink name wasn't passed in as parameter
IF p_dblink_name IS NULL THEN
PERFORM dblink_disconnect(v_dblink_name);
END IF;
EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
EXCEPTION
WHEN QUERY_CANCELED OR OTHERS THEN
EXECUTE 'SELECT '||v_dblink_schema||'.dblink_get_connections() @> ARRAY['||quote_literal(v_dblink_name)||']' INTO v_link_exists;
IF v_link_exists THEN
EXECUTE 'SELECT '||v_dblink_schema||'.dblink_disconnect('||quote_literal(v_dblink_name)||')';
END IF;
EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
RAISE EXCEPTION '%', SQLERRM;
END
$$;
|