
|
/*
* Plain table refresh function.
*/
CREATE FUNCTION refresh_table(p_destination text, p_truncate_cascade boolean DEFAULT NULL, p_jobmon boolean DEFAULT NULL, p_lock_wait int DEFAULT NULL, p_debug boolean DEFAULT false) RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
v_adv_lock boolean;
v_cols text;
v_cols_n_types text;
v_condition text;
v_cursor_name text;
v_dblink int;
v_dblink_name text;
v_dblink_schema text;
v_dest_schema_name text;
v_dest_table text;
v_dest_table_name text;
v_fetch_sql text;
v_filter text[];
v_job_id bigint;
v_job_name text;
v_jobmon boolean;
v_jobmon_schema text;
v_link_exists boolean;
v_old_search_path text;
v_post_script text[];
v_remote_sql text;
v_rowcount bigint := 0;
v_seq text;
v_seq_max bigint;
v_sequences text[];
v_source_table text;
v_seq_name text;
v_seq_oid oid;
v_seq_reset text;
v_seq_schema text;
v_src_schema_name text;
v_src_table_name text;
v_step_id bigint;
v_total bigint := 0;
v_truncate_cascade boolean;
v_truncate_sql text;
BEGIN
-- Take advisory lock to prevent multiple calls to function overlapping
v_adv_lock := @extschema@.concurrent_lock_check(p_destination, p_lock_wait);
IF v_adv_lock = 'false' THEN
-- This code is known duplication of code below.
-- This is done in order to keep advisory lock as early in the code as possible to avoid race conditions and still log if issues are encountered.
v_job_name := 'Refresh Table: '||p_destination;
SELECT jobmon INTO v_jobmon FROM @extschema@.refresh_config_table WHERE dest_table = p_destination;
SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid;
v_jobmon := COALESCE(p_jobmon, v_jobmon);
IF v_jobmon IS TRUE AND v_jobmon_schema IS NULL THEN
RAISE EXCEPTION 'jobmon config set to TRUE, but unable to determine if pg_jobmon extension is installed';
END IF;
IF v_jobmon THEN
EXECUTE format('SELECT %I.add_job(%L)', v_jobmon_schema, v_job_name) INTO v_job_id;
EXECUTE format('SELECT %I.add_step(%L, %L)', v_jobmon_schema, v_job_id, 'Obtaining advisory lock for job: '||v_job_name) INTO v_step_id;
EXECUTE format('SELECT %I.update_step(%L, %L, %L)', v_jobmon_schema, v_step_id, 'WARNING', 'Found concurrent job. Exiting gracefully');
EXECUTE format('SELECT %I.fail_job(%L, %L)', v_jobmon_schema, v_job_id, 2);
END IF;
PERFORM @extschema@.gdb(p_debug,'Obtaining advisory lock FAILED for job: '||v_job_name);
RAISE DEBUG 'Found concurrent job. Exiting gracefully';
RETURN;
END IF;
SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid;
SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid;
IF p_jobmon IS TRUE AND v_jobmon_schema IS NULL THEN
RAISE EXCEPTION 'p_jobmon parameter set to TRUE, but unable to determine if pg_jobmon extension is installed';
END IF;
v_dblink_name := @extschema@.check_name_length('mimeo_table_refresh_'||p_destination);
v_job_name := 'Refresh Table: '||p_destination;
SELECT current_setting('search_path') INTO v_old_search_path;
EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||COALESCE(v_jobmon_schema||',', '')||v_dblink_schema||',public'',''false'')';
SELECT source_table
, dest_table
, dblink
, filter
, condition
, sequences
, truncate_cascade
, jobmon
INTO v_source_table
, v_dest_table
, v_dblink
, v_filter
, v_condition
, v_sequences
, v_truncate_cascade
, v_jobmon
FROM refresh_config_table
WHERE dest_table = p_destination;
IF NOT FOUND THEN
RAISE EXCEPTION 'Destination table given in argument (%) is not managed by mimeo.', p_destination;
END IF;
-- Allow override with parameter
v_jobmon := COALESCE(p_jobmon, v_jobmon);
IF v_jobmon THEN
v_job_id := add_job(v_job_name);
PERFORM gdb(p_debug,'Job ID: '||v_job_id::text);
END IF;
SELECT schemaname, tablename
INTO v_dest_schema_name, v_dest_table_name
FROM pg_catalog.pg_tables
WHERE schemaname||'.'||tablename = v_dest_table;
IF p_truncate_cascade IS NOT NULL THEN
v_truncate_cascade := p_truncate_cascade;
END IF;
IF v_jobmon THEN
v_step_id := add_step(v_job_id, 'Truncating destination table');
END IF;
v_truncate_sql := format('TRUNCATE TABLE %I.%I', v_dest_schema_name, v_dest_table_name);
IF v_truncate_cascade THEN
v_truncate_sql := v_truncate_sql || ' CASCADE';
RAISE NOTICE 'WARNING! If this table had foreign keys, you have just truncated all referencing tables as well!';
IF v_jobmon THEN
PERFORM update_step(v_step_id, 'OK', 'If this table had foreign keys, you have just truncated all referencing tables as well!');
END IF;
ELSE
IF v_jobmon THEN
PERFORM update_step(v_step_id, 'OK', 'Done');
END IF;
END IF;
EXECUTE v_truncate_sql;
PERFORM dblink_connect(v_dblink_name, @extschema@.auth(v_dblink));
IF v_jobmon THEN
v_step_id := add_step(v_job_id,'Grabbing Mapping, Building SQL');
END IF;
SELECT array_to_string(p_cols, ',')
, array_to_string(p_cols_n_types, ',')
, p_source_schema_name
, p_source_table_name
INTO v_cols
, v_cols_n_types
, v_src_schema_name
, v_src_table_name
FROM manage_dest_table(v_dest_table, NULL, v_dblink_name, p_debug);
IF v_src_table_name IS NULL THEN
RAISE EXCEPTION 'Source table missing (%)', v_source_table;
END IF;
IF v_jobmon THEN
PERFORM update_step(v_step_id, 'OK', 'Done');
v_step_id := add_step(v_job_id,'Inserting records into local table');
END IF;
v_remote_sql := format('SELECT '||v_cols||' FROM %I.%I', v_src_schema_name, v_src_table_name);
IF v_condition IS NOT NULL THEN
v_remote_sql := v_remote_sql || ' ' || v_condition;
END IF;
-- Ensure name is consistent in case it would get truncated by maximium object name length
v_cursor_name := @extschema@.check_name_length('mimeo_cursor_' || v_src_table_name, p_convert_standard := true);
PERFORM dblink_open(v_dblink_name, v_cursor_name, v_remote_sql);
v_rowcount := 0;
LOOP
v_fetch_sql := format('INSERT INTO %I.%I(%s) SELECT %s FROM dblink_fetch(%L, %L, %s) AS (%s)'
, v_dest_schema_name
, v_dest_table_name
, v_cols
, v_cols
, v_dblink_name
, v_cursor_name
, '50000'
, v_cols_n_types);
EXECUTE v_fetch_sql;
GET DIAGNOSTICS v_rowcount = ROW_COUNT;
EXIT WHEN v_rowcount = 0;
v_total := v_total + coalesce(v_rowcount, 0);
PERFORM gdb(p_debug,'Fetching rows in batches: '||v_total||' done so far.');
IF v_jobmon THEN
PERFORM update_step(v_step_id, 'PENDING', 'Fetching rows in batches: '||v_total||' done so far.');
END IF;
END LOOP;
PERFORM dblink_close(v_dblink_name, v_cursor_name);
IF v_jobmon THEN
PERFORM update_step(v_step_id, 'OK','Inserted '||v_total||' rows');
END IF;
PERFORM dblink_disconnect(v_dblink_name);
-- Reset any sequences given in the parameter to their new value.
-- Checks all tables that use the given sequence to ensure it's the max for the entire database.
IF v_sequences IS NOT NULL THEN
IF v_jobmon THEN
v_step_id := add_step(v_job_id, 'Resetting sequences');
END IF;
FOREACH v_seq IN ARRAY v_sequences LOOP
SELECT n.nspname, c.relname, c.oid
INTO v_seq_schema, v_seq_name, v_seq_oid
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname ||'.'|| c.relname = v_seq;
IF v_seq_oid IS NOT NULL THEN
v_seq_max := sequence_max_value(v_seq_oid);
END IF;
IF v_seq_max IS NOT NULL THEN
v_seq_reset := format('SELECT setval(''%I.%I'', %L)', v_seq_schema, v_seq_name, v_seq_max);
PERFORM gdb(p_debug, 'v_reset_seq: '||v_seq_reset);
EXECUTE v_seq_reset;
END IF;
END LOOP;
IF v_jobmon THEN
PERFORM update_step(v_step_id, 'OK', 'Done');
END IF;
END IF;
IF v_jobmon THEN
v_step_id := add_step(v_job_id,'Updating last_run in config table');
END IF;
UPDATE refresh_config_table set last_run = CURRENT_TIMESTAMP WHERE dest_table = v_dest_table;
IF v_jobmon THEN
PERFORM update_step(v_step_id, 'OK','Last run was '||CURRENT_TIMESTAMP);
END IF;
IF v_jobmon THEN
PERFORM close_job(v_job_id);
END IF;
EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
EXCEPTION
WHEN QUERY_CANCELED THEN
SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid;
EXECUTE format('SELECT %I.dblink_get_connections() @> ARRAY[%L]', v_dblink_schema, v_dblink_name) INTO v_link_exists;
IF v_link_exists THEN
EXECUTE format('SELECT %I.dblink_disconnect(%L)', v_dblink_schema, v_dblink_name);
END IF;
RAISE EXCEPTION '%', SQLERRM;
WHEN OTHERS THEN
SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid;
SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid;
SELECT jobmon INTO v_jobmon FROM @extschema@.refresh_config_table WHERE dest_table = p_destination;
v_jobmon := COALESCE(p_jobmon, v_jobmon);
EXECUTE format('SELECT %I.dblink_get_connections() @> ARRAY[%L]', v_dblink_schema, v_dblink_name) INTO v_link_exists;
IF v_link_exists THEN
EXECUTE format('SELECT %I.dblink_disconnect(%L)', v_dblink_schema, v_dblink_name);
END IF;
IF v_jobmon AND v_jobmon_schema IS NOT NULL THEN
IF v_job_id IS NULL THEN
EXECUTE format('SELECT %I.add_job(%L)', v_jobmon_schema, 'Refresh Table: '||p_destination) INTO v_job_id;
EXECUTE format('SELECT %I.add_step(%L, %L)', v_jobmon_schema, v_job_id, 'EXCEPTION before job logging started') INTO v_step_id;
END IF;
IF v_step_id IS NULL THEN
EXECUTE format('SELECT %I.add_step(%L, %L)', v_jobmon_schema, v_job_id, 'EXCEPTION before first step logged') INTO v_step_id;
END IF;
EXECUTE format('SELECT %I.update_step(%L, %L, %L)', v_jobmon_schema, v_step_id, 'CRITICAL', 'ERROR: '||COALESCE(SQLERRM,'unknown'));
EXECUTE format('SELECT %I.fail_job(%L)', v_jobmon_schema, v_job_id);
END IF;
RAISE EXCEPTION '%', SQLERRM;
END
$$;
|