1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
|
/*
* !!!!!! READ THIS FIRST !!!!!!
* Alternate function to provide a way to use PostgreSQL 8.1 as the source.
* It is not installed as part of the extension so can be safely added and removed without affecting it if you don't rename the function to its original name.
* You must do a find-and-replace to set the proper schema that mimeo is installed to on the destination database.
* I left "@extschema@" in here from the original extension code to provide an easy string to find and replace.
* Just search for that and replace with your installation's schema.
*/
CREATE OR REPLACE FUNCTION @extschema@.refresh_dml_81(p_destination text, p_limit int default NULL, p_repull boolean DEFAULT false, p_debug boolean DEFAULT false) RETURNS void
LANGUAGE plpgsql SECURITY DEFINER
AS $$
DECLARE
v_adv_lock boolean;
v_batch_limit_reached boolean := false;
v_cols_n_types text;
v_cols text;
v_condition text;
v_control text;
v_dblink int;
v_dblink_name text;
v_dblink_schema text;
v_delete_sql text;
v_dest_table text;
v_ex_context text;
v_exec_status text;
v_fetch_sql text;
v_field text;
v_filter text[];
v_insert_sql text;
v_job_id int;
v_jobmon_schema text;
v_job_name text;
v_limit int;
v_old_search_path text;
v_pk_counter int;
v_pk_name_csv text;
v_pk_name_type_csv text := '';
v_pk_name text[];
v_pk_type text[];
v_pk_where text := '';
v_remote_f_sql text;
v_remote_q_sql text;
v_rowcount bigint := 0;
v_source_table text;
v_step_id int;
v_tmp_table text;
v_total bigint := 0;
v_trigger_delete text;
v_trigger_update text;
v_truncate_remote_q text;
v_with_update text;
BEGIN
IF p_debug IS DISTINCT FROM true THEN
PERFORM set_config( 'client_min_messages', 'warning', true );
END IF;
v_job_name := 'Refresh DML: '||p_destination;
v_dblink_name := 'mimeo_dml_refresh_'||p_destination;
SELECT nspname INTO v_dblink_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'dblink' AND e.extnamespace = n.oid;
SELECT nspname INTO v_jobmon_schema FROM pg_namespace n, pg_extension e WHERE e.extname = 'pg_jobmon' AND e.extnamespace = n.oid;
-- Set custom search path to allow easier calls to other functions, especially job logging
SELECT current_setting('search_path') INTO v_old_search_path;
EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||',public'',''false'')';
SELECT source_table
, dest_table
, 'tmp_'||replace(dest_table,'.','_')
, dblink
, control
, pk_name
, pk_type
, filter
, condition
, batch_limit
FROM refresh_config_dml
WHERE dest_table = p_destination INTO
v_source_table
, v_dest_table
, v_tmp_table
, v_dblink
, v_control
, v_pk_name
, v_pk_type
, v_filter
, v_condition
, v_limit;
IF NOT FOUND THEN
RAISE EXCEPTION 'ERROR: no configuration found for %',v_job_name;
END IF;
v_job_id := add_job(v_job_name);
PERFORM gdb(p_debug,'Job ID: '||v_job_id::text);
-- Take advisory lock to prevent multiple calls to function overlapping
v_adv_lock := pg_try_advisory_lock(hashtext('refresh_dml'), hashtext(v_job_name));
IF v_adv_lock = 'false' THEN
v_step_id := add_step(v_job_id,'Obtaining advisory lock for job: '||v_job_name);
PERFORM gdb(p_debug,'Obtaining advisory lock FAILED for job: '||v_job_name);
PERFORM update_step(v_step_id, 'WARNING','Found concurrent job. Exiting gracefully');
PERFORM fail_job(v_job_id, 2);
EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
RETURN;
END IF;
v_step_id := add_step(v_job_id,'Sanity check primary/unique key values');
IF v_pk_name IS NULL OR v_pk_type IS NULL THEN
RAISE EXCEPTION 'ERROR: primary key fields in refresh_config_dml must be defined';
END IF;
-- determine column list, column type list
IF v_filter IS NULL THEN
SELECT array_to_string(array_agg(attname),','), array_to_string(array_agg(attname||' '||format_type(atttypid, atttypmod)::text),',')
INTO v_cols, v_cols_n_types
FROM pg_attribute WHERE attrelid = p_destination::regclass AND attnum > 0 AND attisdropped is false;
ELSE
-- ensure all primary key columns are included in any column filters
FOREACH v_field IN ARRAY v_pk_name LOOP
IF v_field = ANY(v_filter) THEN
CONTINUE;
ELSE
RAISE EXCEPTION 'ERROR: filter list did not contain all columns that compose primary/unique key for %',v_job_name;
END IF;
END LOOP;
SELECT array_to_string(array_agg(attname),','), array_to_string(array_agg(attname||' '||format_type(atttypid, atttypmod)::text),',')
INTO v_cols, v_cols_n_types
FROM pg_attribute WHERE attrelid = p_destination::regclass AND ARRAY[attname::text] <@ v_filter AND attnum > 0 AND attisdropped is false;
END IF;
IF p_limit IS NOT NULL THEN
v_limit := p_limit;
END IF;
v_pk_name_csv := array_to_string(v_pk_name, ',');
v_pk_counter := 1;
WHILE v_pk_counter <= array_length(v_pk_name,1) LOOP
IF v_pk_counter > 1 THEN
v_pk_name_type_csv := v_pk_name_type_csv || ', ';
END IF;
v_pk_name_type_csv := v_pk_name_type_csv ||v_pk_name[v_pk_counter]||' '||v_pk_type[v_pk_counter];
v_pk_counter := v_pk_counter + 1;
END LOOP;
PERFORM update_step(v_step_id, 'OK','Done');
PERFORM dblink_connect(v_dblink_name, auth(v_dblink));
-- update remote entries
v_step_id := add_step(v_job_id,'Updating remote trigger table');
-- Not really a WITH update anymore, but just re-using variable names from original
v_pk_counter := 1;
v_with_update := 'UPDATE '||v_control||' SET processed = true
FROM (SELECT '||v_pk_name_csv||' FROM '||v_control||' ORDER BY '||v_pk_name_csv||' LIMIT '||COALESCE(v_limit::text, 'ALL')||' ) AS b
WHERE ';
WHILE v_pk_counter <= array_length(v_pk_name,1) LOOP
IF v_pk_counter > 1 THEN
v_with_update := v_with_update ||' AND ';
END IF;
v_with_update := v_with_update ||' '||v_control||'.'||v_pk_name[v_pk_counter]||' = b.'||v_pk_name[v_pk_counter];
v_pk_counter := v_pk_counter + 1;
END LOOP;
v_trigger_update := 'SELECT dblink_exec('||quote_literal(v_dblink_name)||','|| quote_literal(v_with_update)||')';
PERFORM gdb(p_debug,v_trigger_update);
EXECUTE v_trigger_update INTO v_exec_status;
PERFORM update_step(v_step_id, 'OK','Result was '||v_exec_status);
IF p_repull THEN
-- Do truncate of remote queue table here before full data pull is actually started to ensure all new changes are recorded
PERFORM update_step(v_step_id, 'OK','Request to repull ALL data from source. This could take a while...');
PERFORM gdb(p_debug, 'Request to repull ALL data from source. This could take a while...');
v_truncate_remote_q := 'SELECT dblink_exec('||quote_literal(v_dblink_name)||','||quote_literal('TRUNCATE TABLE '||v_control)||')';
EXECUTE v_truncate_remote_q;
v_step_id := add_step(v_job_id,'Truncating local table');
PERFORM gdb(p_debug,'Truncating local table');
EXECUTE 'TRUNCATE '||v_dest_table;
PERFORM update_step(v_step_id, 'OK','Done');
-- Define cursor query
v_remote_f_sql := 'SELECT '||v_cols||' FROM '||v_source_table;
IF v_condition IS NOT NULL THEN
v_remote_f_sql := v_remote_f_sql || ' ' || v_condition;
END IF;
ELSE
EXECUTE 'CREATE TEMP TABLE '||v_tmp_table||'_queue ('||v_pk_name_type_csv||', PRIMARY KEY ('||v_pk_name_csv||'))';
-- Copy queue locally for use in removing updated/deleted rows
v_remote_q_sql := 'SELECT DISTINCT '||v_pk_name_csv||' FROM '||v_control||' WHERE processed = true';
PERFORM dblink_open(v_dblink_name, 'mimeo_cursor', v_remote_q_sql);
v_step_id := add_step(v_job_id, 'Creating local queue temp table');
v_rowcount := 0;
LOOP
v_fetch_sql := 'INSERT INTO '||v_tmp_table||'_queue ('||v_pk_name_csv||')
SELECT '||v_pk_name_csv||' FROM dblink_fetch('||quote_literal(v_dblink_name)||', ''mimeo_cursor'', 50000) AS ('||v_pk_name_type_csv||')';
EXECUTE v_fetch_sql;
GET DIAGNOSTICS v_rowcount = ROW_COUNT;
EXIT WHEN v_rowcount = 0;
v_total := v_total + coalesce(v_rowcount, 0);
PERFORM gdb(p_debug,'Fetching rows in batches: '||v_total||' done so far.');
PERFORM update_step(v_step_id, 'PENDING', 'Fetching rows in batches: '||v_total||' done so far.');
END LOOP;
PERFORM dblink_close(v_dblink_name, 'mimeo_cursor');
EXECUTE 'CREATE INDEX ON '||v_tmp_table||'_queue ('||v_pk_name_csv||')';
EXECUTE 'ANALYZE '||v_tmp_table||'_queue';
PERFORM update_step(v_step_id, 'OK','Number of rows inserted: '||v_total);
PERFORM gdb(p_debug,'Temp queue table row count '||v_total::text);
v_step_id := add_step(v_job_id,'Deleting records from local table');
v_pk_counter := 1;
v_delete_sql := 'DELETE FROM '||v_dest_table||' AS a USING '||v_tmp_table||'_queue WHERE ';
WHILE v_pk_counter <= array_length(v_pk_name,1) LOOP
IF v_pk_counter > 1 THEN
v_delete_sql := v_delete_sql ||' AND ';
END IF;
v_delete_sql := v_delete_sql ||' a.'||v_pk_name[v_pk_counter]||' = '||v_tmp_table||'_queue.'||v_pk_name[v_pk_counter];
v_pk_counter := v_pk_counter + 1;
END LOOP;
PERFORM gdb(p_debug,v_delete_sql);
EXECUTE v_delete_sql;
GET DIAGNOSTICS v_rowcount = ROW_COUNT;
PERFORM gdb(p_debug,'Rows removed from local table before applying changes: '||v_rowcount::text);
PERFORM update_step(v_step_id, 'OK','Removed '||v_rowcount||' records');
-- Define cursor query
v_remote_f_sql := 'SELECT '||v_cols||' FROM '||v_source_table||' JOIN ('||v_remote_q_sql||') x USING ('||v_pk_name_csv||')';
IF v_condition IS NOT NULL THEN
v_remote_f_sql := v_remote_f_sql || ' ' || v_condition;
END IF;
END IF;
-- insert records to local table. Have to do temp table in case destination table is partitioned (returns 0 when inserting to parent)
PERFORM dblink_open(v_dblink_name, 'mimeo_cursor', v_remote_f_sql);
v_step_id := add_step(v_job_id, 'Inserting new records into local table');
EXECUTE 'CREATE TEMP TABLE '||v_tmp_table||'_full ('||v_cols_n_types||')';
v_rowcount := 0;
v_total := 0;
LOOP
v_fetch_sql := 'INSERT INTO '||v_tmp_table||'_full ('||v_cols||')
SELECT '||v_cols||' FROM dblink_fetch('||quote_literal(v_dblink_name)||', ''mimeo_cursor'', 50000) AS ('||v_cols_n_types||')';
EXECUTE v_fetch_sql;
GET DIAGNOSTICS v_rowcount = ROW_COUNT;
v_total := v_total + coalesce(v_rowcount, 0);
EXECUTE 'INSERT INTO '||v_dest_table||' ('||v_cols||') SELECT '||v_cols||' FROM '||v_tmp_table||'_full';
EXECUTE 'TRUNCATE '||v_tmp_table||'_full';
EXIT WHEN v_rowcount = 0;
PERFORM gdb(p_debug,'Fetching rows in batches: '||v_total||' done so far.');
PERFORM update_step(v_step_id, 'PENDING', 'Fetching rows in batches: '||v_total||' done so far.');
END LOOP;
PERFORM dblink_close(v_dblink_name, 'mimeo_cursor');
PERFORM update_step(v_step_id, 'OK','Number of rows inserted: '||v_total);
IF p_repull = false AND v_total > (v_limit * .75) THEN
v_step_id := add_step(v_job_id, 'Row count warning');
PERFORM update_step(v_step_id, 'WARNING','Row count fetched ('||v_total||') greater than 75% of batch limit ('||v_limit||'). Recommend increasing batch limit if possible.');
v_batch_limit_reached := true;
END IF;
-- clean out rows from txn table
v_step_id := add_step(v_job_id,'Cleaning out rows from txn table');
v_trigger_delete := 'SELECT dblink_exec('||quote_literal(v_dblink_name)||','||quote_literal('DELETE FROM '||v_control||' WHERE processed = true')||')';
PERFORM gdb(p_debug,v_trigger_delete);
EXECUTE v_trigger_delete INTO v_exec_status;
PERFORM update_step(v_step_id, 'OK','Result was '||v_exec_status);
-- update activity status
v_step_id := add_step(v_job_id,'Updating last_run in config table');
UPDATE refresh_config_dml SET last_run = CURRENT_TIMESTAMP WHERE dest_table = p_destination;
PERFORM update_step(v_step_id, 'OK','Last run was '||CURRENT_TIMESTAMP);
EXECUTE 'DROP TABLE IF EXISTS '||v_tmp_table||'_full';
EXECUTE 'DROP TABLE IF EXISTS '||v_tmp_table||'_queue';
PERFORM dblink_disconnect(v_dblink_name);
IF v_batch_limit_reached = false THEN
PERFORM close_job(v_job_id);
ELSE
-- Set final job status to level 2 (WARNING) to bring notice that the batch limit was reached and may need adjusting.
-- Preventive warning to keep replication from falling behind.
PERFORM fail_job(v_job_id, 2);
END IF;
-- Ensure old search path is reset for the current session
EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
PERFORM pg_advisory_unlock(hashtext('refresh_dml'), hashtext(v_job_name));
EXCEPTION
WHEN QUERY_CANCELED THEN
EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||''',''false'')';
IF dblink_get_connections() @> ARRAY[v_dblink_name] THEN
PERFORM dblink_disconnect(v_dblink_name);
END IF;
EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
PERFORM pg_advisory_unlock(hashtext('refresh_dml'), hashtext(v_job_name));
RAISE EXCEPTION '%', SQLERRM;
WHEN OTHERS THEN
EXECUTE 'SELECT set_config(''search_path'',''@extschema@,'||v_jobmon_schema||','||v_dblink_schema||''',''false'')';
GET STACKED DIAGNOSTICS v_ex_context = PG_EXCEPTION_CONTEXT;
IF v_job_id IS NULL THEN
v_job_id := add_job('Refresh DML: '||p_destination);
v_step_id := add_step(v_job_id, 'EXCEPTION before job logging started');
END IF;
IF v_step_id IS NULL THEN
v_step_id := add_step(v_job_id, 'EXCEPTION before first step logged');
END IF;
IF dblink_get_connections() @> ARRAY[v_dblink_name] THEN
PERFORM dblink_disconnect(v_dblink_name);
END IF;
PERFORM update_step(v_step_id, 'CRITICAL', 'ERROR: '||coalesce(SQLERRM,'unknown'));
PERFORM fail_job(v_job_id);
EXECUTE 'SELECT set_config(''search_path'','''||v_old_search_path||''',''false'')';
PERFORM pg_advisory_unlock(hashtext('refresh_dml'), hashtext(v_job_name));
RAISE EXCEPTION '%
CONTEXT: %', SQLERRM, v_ex_context;
END
$$;
|