1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
/*
* Check tables on data sources to see if columns have been added or types changed.
* Returns a record value so that a WHERE condition can be used to ignore tables that aren't desired.
* If p_data_source_id value is not given, all configured sources are checked.
*/
CREATE FUNCTION check_source_columns(p_data_source_id int DEFAULT NULL
, OUT dest_schemaname text
, OUT dest_tablename text
, OUT src_schemaname text
, OUT src_tablename text
, OUT missing_column_name text
, OUT missing_column_type text
, OUT data_source int) RETURNS SETOF record
LANGUAGE plpgsql
AS $$
DECLARE
v_dblink_schema text;
v_exists int;
v_row_dblink record;
v_row_table record;
v_row_col record;
v_sql text;
v_src_schema_name text;
v_src_table_name text;
BEGIN
IF p_data_source_id IS NOT NULL THEN
SELECT count(*) INTO v_exists FROM @extschema@.dblink_mapping_mimeo WHERE data_source_id = p_data_source_id;
IF v_exists < 1 THEN
RAISE EXCEPTION 'Given data_source_id (%) does not exist in @extschema@.dblink_mapping_mimeo config table', p_data_source_id;
END IF;
END IF;
SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid;
FOR v_row_dblink IN SELECT data_source_id FROM @extschema@.dblink_mapping_mimeo
LOOP
-- Allow a parameter to choose which data sources are checked. If parameter is NULL, check them all.
IF p_data_source_id IS NOT NULL THEN
IF p_data_source_id <> v_row_dblink.data_source_id THEN
CONTINUE;
END IF;
END IF;
EXECUTE format('SELECT %I.dblink_connect(%L, @extschema@.auth(%s))', v_dblink_schema, 'mimeo_col_check', v_row_dblink.data_source_id);
FOR v_row_table IN
SELECT source_table, dest_table, filter, type FROM @extschema@.refresh_config WHERE dblink = v_row_dblink.data_source_id ORDER BY 1,2
LOOP
v_sql := format('SELECT schemaname, tablename
FROM (
SELECT schemaname, tablename
FROM pg_catalog.pg_tables
WHERE schemaname ||''.''|| tablename = %L
UNION
SELECT schemaname, viewname AS tablename
FROM pg_catalog.pg_views
WHERE schemaname || ''.'' || viewname = %L
) tables LIMIT 1'
, v_row_table.source_table, v_row_table.source_table);
EXECUTE format('SELECT schemaname, tablename
FROM %I.dblink(%L, %L)
AS (schemaname text, tablename text)'
, v_dblink_schema, 'mimeo_col_check', v_sql)
INTO v_src_schema_name, v_src_table_name;
v_sql := format('SELECT a.attname::text, format_type(a.atttypid, a.atttypmod) AS formatted_type, n.nspname AS schemaname, c.relname AS tablename
FROM pg_catalog.pg_attribute a
JOIN pg_catalog.pg_class c ON c.oid = a.attrelid
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = %L
AND c.relname = %L
AND a.attnum > 0
AND attisdropped = false
ORDER BY 1,2'
, v_src_schema_name, v_src_table_name);
FOR v_row_col IN EXECUTE
format('SELECT attname, formatted_type, schemaname, tablename
FROM %I.dblink(%L, %L)
AS (attname text, formatted_type text, schemaname text, tablename text)'
, v_dblink_schema, 'mimeo_col_check', v_sql)
LOOP
IF v_row_col.attname <> ANY (v_row_table.filter) THEN
CONTINUE;
END IF;
IF v_row_table.type = 'snap' THEN
SELECT schemaname, viewname INTO dest_schemaname, dest_tablename FROM pg_catalog.pg_views WHERE schemaname||'.'||viewname = v_row_table.dest_table;
ELSE
SELECT schemaname, tablename INTO dest_schemaname, dest_tablename FROM pg_catalog.pg_tables WHERE schemaname||'.'||tablename = v_row_table.dest_table;
END IF;
SELECT count(*) INTO v_exists
FROM pg_catalog.pg_attribute a
JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = dest_schemaname
AND c.relname = dest_tablename
AND attnum > 0
AND attisdropped = false
AND attname = v_row_col.attname
AND format_type(atttypid, atttypmod) = v_row_col.formatted_type;
-- if column doesn't exist, means it's missing on destination.
IF v_exists < 1 THEN
src_schemaname := v_row_col.schemaname;
src_tablename := v_row_col.tablename;
missing_column_name := v_row_col.attname;
missing_column_type := v_row_col.formatted_type;
data_source := v_row_dblink.data_source_id;
RETURN NEXT;
ELSE
-- Reset output variables used above
dest_schemaname = NULL;
dest_tablename = NULL;
END IF;
END LOOP; -- end v_row_col
END LOOP; -- end v_row_table
EXECUTE format('SELECT %I.dblink_disconnect(%L)', v_dblink_schema, 'mimeo_col_check');
END LOOP; -- end v_row_dblink
END
$$;
|