
|
CREATE FUNCTION @extschema@.undo_partition(
p_parent_table text
, p_target_table text
, p_loop_count int DEFAULT 1
, p_batch_interval text DEFAULT NULL
, p_keep_table boolean DEFAULT true
, p_lock_wait numeric DEFAULT 0
, p_ignored_columns text[] DEFAULT NULL
, p_drop_cascade boolean DEFAULT false
, OUT partitions_undone int
, OUT rows_undone bigint
)
RETURNS record
LANGUAGE plpgsql
AS $$
DECLARE
ex_context text;
ex_detail text;
ex_hint text;
ex_message text;
v_adv_lock boolean;
v_batch_interval_id bigint;
v_batch_interval_time interval;
v_batch_loop_count int := 0;
v_child_loop_total bigint := 0;
v_child_table text;
v_column_list text;
v_control text;
v_control_type text;
v_time_encoder text;
v_time_decoder text;
v_child_min_id bigint;
v_child_min_time timestamptz;
v_epoch text;
v_function_name text;
v_jobmon boolean;
v_jobmon_schema text;
v_job_id bigint;
v_inner_loop_count int;
v_lock_iter int := 1;
v_lock_obtained boolean := FALSE;
v_new_search_path text;
v_old_search_path text;
v_parent_schema text;
v_parent_tablename text;
v_partition_expression text;
v_partition_interval text;
v_row record;
v_rowcount bigint;
v_sql text;
v_step_id bigint;
v_sub_count int;
v_target_schema text;
v_target_tablename text;
v_template_schema text;
v_template_siblings int;
v_template_table text;
v_template_tablename text;
v_total bigint := 0;
v_trig_name text;
v_undo_count int := 0;
BEGIN
/*
* Moves data to new, target table since data cannot be moved elsewhere in the same partition set.
* Leaves old parent table as is and does not change name of new table.
*/
v_adv_lock := pg_try_advisory_xact_lock(hashtext('pg_partman undo_partition'));
IF v_adv_lock = 'false' THEN
RAISE NOTICE 'undo_partition already running.';
partitions_undone = -1;
RETURN;
END IF;
IF p_parent_table = p_target_table THEN
RAISE EXCEPTION 'Target table cannot be the same as the parent table';
END IF;
SELECT partition_interval::text
, control
, time_encoder
, time_decoder
, jobmon
, epoch
, template_table
INTO v_partition_interval
, v_control
, v_time_encoder
, v_time_decoder
, v_jobmon
, v_epoch
, v_template_table
FROM @extschema@.part_config
WHERE parent_table = p_parent_table;
IF v_control IS NULL THEN
RAISE EXCEPTION 'No configuration found for pg_partman for given parent table: %', p_parent_table;
END IF;
IF p_target_table IS NULL THEN
RAISE EXCEPTION 'The p_target_table option must be set when undoing a partitioned table';
END IF;
SELECT n.nspname, c.relname
INTO v_parent_schema, v_parent_tablename
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = split_part(p_parent_table, '.', 1)::name
AND c.relname = split_part(p_parent_table, '.', 2)::name;
IF v_parent_tablename IS NULL THEN
RAISE EXCEPTION 'Given parent table not found in system catalogs: %', p_parent_table;
END IF;
SELECT general_type INTO v_control_type FROM @extschema@.check_control_type(v_parent_schema, v_parent_tablename, v_control);
IF v_control_type IN ('time', 'text', 'uuid') OR (v_control_type = 'id' AND v_epoch <> 'none') THEN
IF p_batch_interval IS NULL THEN
v_batch_interval_time := v_partition_interval::interval;
ELSE
v_batch_interval_time := p_batch_interval::interval;
END IF;
ELSIF v_control_type = 'id' THEN
IF p_batch_interval IS NULL THEN
v_batch_interval_id := v_partition_interval::bigint;
ELSE
v_batch_interval_id := p_batch_interval::bigint;
END IF;
ELSE
RAISE EXCEPTION 'Data type of control column in given partition set must be either date/time or integer.';
END IF;
SELECT current_setting('search_path') INTO v_old_search_path;
IF length(v_old_search_path) > 0 THEN
v_new_search_path := '@extschema@,pg_temp,'||v_old_search_path;
ELSE
v_new_search_path := '@extschema@,pg_temp';
END IF;
IF v_jobmon THEN
SELECT nspname INTO v_jobmon_schema FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'pg_jobmon'::name AND e.extnamespace = n.oid;
IF v_jobmon_schema IS NOT NULL THEN
v_new_search_path := format('%s,%s',v_jobmon_schema, v_new_search_path);
END IF;
END IF;
EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_new_search_path, 'false');
-- Check if any child tables are themselves partitioned or part of an inheritance tree. Prevent undo at this level if so.
-- Need to lock child tables at all levels before multi-level undo can be performed safely.
FOR v_row IN
SELECT partition_schemaname, partition_tablename FROM @extschema@.show_partitions(p_parent_table)
LOOP
SELECT count(*) INTO v_sub_count
FROM pg_catalog.pg_inherits i
JOIN pg_catalog.pg_class c ON i.inhparent = c.oid
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE c.relname = v_row.partition_tablename::name
AND n.nspname = v_row.partition_schemaname::name;
IF v_sub_count > 0 THEN
RAISE EXCEPTION 'Child table for this parent has child table(s) itself (%). Run undo partitioning on this table to ensure all data is properly moved to target table', v_row.partition_schemaname||'.'||v_row.partition_tablename;
END IF;
END LOOP;
SELECT n.nspname, c.relname
INTO v_target_schema, v_target_tablename
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = split_part(p_target_table, '.', 1)::name
AND c.relname = split_part(p_target_table, '.', 2)::name;
IF v_target_tablename IS NULL THEN
RAISE EXCEPTION 'Given target table not found in system catalogs: %', p_target_table;
END IF;
IF v_jobmon_schema IS NOT NULL THEN
v_job_id := add_job(format('PARTMAN UNDO PARTITIONING: %s', p_parent_table));
v_step_id := add_step(v_job_id, format('Undoing partitioning for table %s', p_parent_table));
END IF;
v_partition_expression := CASE
WHEN v_epoch = 'seconds' THEN format('to_timestamp(%I)', v_control)
WHEN v_epoch = 'milliseconds' THEN format('to_timestamp((%I/1000)::float)', v_control)
WHEN v_epoch = 'microseconds' THEN format('to_timestamp((%I/1000000)::float)', v_control)
WHEN v_epoch = 'nanoseconds' THEN format('to_timestamp((%I/1000000000)::float)', v_control)
ELSE format('%I', v_control)
END;
-- Stops new time partitions from being made as well as stopping child tables from being dropped if they were configured with a retention period.
UPDATE @extschema@.part_config SET undo_in_progress = true WHERE parent_table = p_parent_table;
IF v_jobmon_schema IS NOT NULL THEN
IF (v_trig_name IS NOT NULL OR v_function_name IS NOT NULL) THEN
PERFORM update_step(v_step_id, 'OK', 'Stopped partition creation process. Removed trigger & trigger function');
ELSE
PERFORM update_step(v_step_id, 'OK', 'Stopped partition creation process.');
END IF;
END IF;
-- Generate column list to use in SELECT/INSERT statements below. Allows for exclusion of GENERATED (or any other desired) columns.
SELECT string_agg(quote_ident(attname), ',')
INTO v_column_list
FROM pg_catalog.pg_attribute a
JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = v_target_schema
AND c.relname = v_target_tablename
AND a.attnum > 0
AND a.attisdropped = false
AND attname <> ALL(COALESCE(p_ignored_columns, ARRAY[]::text[]));
<<outer_child_loop>>
LOOP
-- Get ordered list of child table in set. Store in variable one at a time per loop until none are left or batch count is reached.
-- This easily allows it to loop over same child table until empty or move onto next child table after it's dropped
-- Include the default table to ensure all data there is removed as well
SELECT partition_tablename INTO v_child_table FROM @extschema@.show_partitions(p_parent_table, 'ASC', p_include_default := TRUE) LIMIT 1;
EXIT outer_child_loop WHEN v_child_table IS NULL;
IF v_jobmon_schema IS NOT NULL THEN
v_step_id := add_step(v_job_id, format('Removing child partition: %s.%s', v_parent_schema, v_child_table));
END IF;
IF v_control_type = 'time' OR (v_control_type = 'id' AND v_epoch <> 'none') THEN
EXECUTE format('SELECT min(%s) FROM %I.%I', v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_time;
ELSIF (v_control_type IN ('text', 'uuid')) THEN
--- This can pass NULL to decoder function
EXECUTE format('SELECT %s((SELECT min(%s::text) FROM %I.%I))', v_time_decoder, v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_time;
ELSIF v_control_type = 'id' THEN
EXECUTE format('SELECT min(%s) FROM %I.%I', v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_id;
END IF;
IF v_child_min_time IS NULL AND v_child_min_id IS NULL THEN
-- No rows left in this child table. Remove from partition set.
-- lockwait timeout for table drop
IF p_lock_wait > 0 THEN
v_lock_iter := 0;
WHILE v_lock_iter <= 5 LOOP
v_lock_iter := v_lock_iter + 1;
BEGIN
EXECUTE format('LOCK TABLE ONLY %I.%I IN ACCESS EXCLUSIVE MODE NOWAIT', v_parent_schema, v_child_table);
v_lock_obtained := TRUE;
EXCEPTION
WHEN lock_not_available THEN
PERFORM pg_sleep( p_lock_wait / 5.0 );
CONTINUE;
END;
EXIT WHEN v_lock_obtained;
END LOOP;
IF NOT v_lock_obtained THEN
RAISE NOTICE 'Unable to obtain lock on child table for removal from partition set';
partitions_undone = -1;
RETURN;
END IF;
END IF; -- END p_lock_wait IF
v_lock_obtained := FALSE; -- reset for reuse later
v_sql := format('ALTER TABLE %I.%I DETACH PARTITION %I.%I'
, v_parent_schema
, v_parent_tablename
, v_parent_schema
, v_child_table);
EXECUTE v_sql;
IF p_keep_table = false THEN
v_sql := 'DROP TABLE %I.%I';
IF p_drop_cascade THEN
v_sql := v_sql || ' CASCADE';
END IF;
EXECUTE format(v_sql, v_parent_schema, v_child_table);
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'OK', format('Child table DROPPED. Moved %s rows to target table', v_child_loop_total));
END IF;
ELSE
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'OK', format('Child table DETACHED/UNINHERITED from parent, not DROPPED. Moved %s rows to target table', v_child_loop_total));
END IF;
END IF;
v_undo_count := v_undo_count + 1;
EXIT outer_child_loop WHEN v_batch_loop_count >= p_loop_count; -- Exit outer FOR loop if p_loop_count is reached
CONTINUE outer_child_loop; -- skip data moving steps below
END IF;
v_inner_loop_count := 1;
v_child_loop_total := 0;
<<inner_child_loop>>
LOOP
IF v_control_type IN ('time', 'text', 'uuid') OR (v_control_type = 'id' AND v_epoch <> 'none') THEN
-- do some locking with timeout, if required
IF p_lock_wait > 0 THEN
v_lock_iter := 0;
WHILE v_lock_iter <= 5 LOOP
v_lock_iter := v_lock_iter + 1;
BEGIN
IF v_control_type = 'time' OR (v_control_type = 'id' AND v_epoch <> 'none') THEN
EXECUTE format('SELECT * FROM %I.%I WHERE %I <= %L FOR UPDATE NOWAIT'
, v_parent_schema
, v_child_table
, v_control
, v_child_min_time + (v_batch_interval_time * v_inner_loop_count));
ELSIF (v_control_type IN ('text', 'uuid')) THEN
EXECUTE format('SELECT * FROM %I.%I WHERE %I <= %s(%L) FOR UPDATE NOWAIT'
, v_parent_schema
, v_child_table
, v_control
, v_time_encoder
, v_child_min_time + (v_batch_interval_time * v_inner_loop_count));
END IF;
v_lock_obtained := TRUE;
EXCEPTION
WHEN lock_not_available THEN
PERFORM pg_sleep( p_lock_wait / 5.0 );
CONTINUE;
END;
EXIT WHEN v_lock_obtained;
END LOOP;
IF NOT v_lock_obtained THEN
RAISE NOTICE 'Unable to obtain lock on batch of rows to move';
partitions_undone = -1;
RETURN;
END IF;
END IF;
-- Get everything from the current child minimum up to the multiples of the given interval
IF v_control_type = 'time' OR (v_control_type = 'id' AND v_epoch <> 'none') THEN
EXECUTE format('WITH move_data AS (
DELETE FROM %I.%I WHERE %s <= %L RETURNING %s )
INSERT INTO %I.%I (%5$s) SELECT %5$s FROM move_data'
, v_parent_schema
, v_child_table
, v_partition_expression
, v_child_min_time + (v_batch_interval_time * v_inner_loop_count)
, v_column_list
, v_target_schema
, v_target_tablename);
ELSIF (v_control_type IN ('text', 'uuid')) THEN
EXECUTE format('WITH move_data AS (
DELETE FROM %I.%I WHERE %s <= %s(%L) RETURNING %s )
INSERT INTO %I.%I (%6$s) SELECT %6$s FROM move_data'
, v_parent_schema
, v_child_table
, v_partition_expression
, v_time_encoder
, v_child_min_time + (v_batch_interval_time * v_inner_loop_count)
, v_column_list
, v_target_schema
, v_target_tablename);
END IF;
GET DIAGNOSTICS v_rowcount = ROW_COUNT;
v_total := v_total + v_rowcount;
v_child_loop_total := v_child_loop_total + v_rowcount;
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'OK', format('Moved %s rows to target table.', v_child_loop_total));
END IF;
EXIT inner_child_loop WHEN v_rowcount = 0; -- exit before loop incr if table is empty
v_inner_loop_count := v_inner_loop_count + 1;
v_batch_loop_count := v_batch_loop_count + 1;
-- Check again if table is empty and go to outer loop again to drop it if so
IF v_control_type = 'time' OR (v_control_type = 'id' AND v_epoch <> 'none') THEN
EXECUTE format('SELECT min(%s) FROM %I.%I', v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_time;
ELSIF (v_control_type IN ('text', 'uuid')) THEN
EXECUTE format('SELECT %s((SELECT min(%s::text) FROM %I.%I))', v_time_decoder, v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_time;
END IF;
CONTINUE outer_child_loop WHEN v_child_min_time IS NULL;
ELSIF v_control_type = 'id' THEN
IF p_lock_wait > 0 THEN
v_lock_iter := 0;
WHILE v_lock_iter <= 5 LOOP
v_lock_iter := v_lock_iter + 1;
BEGIN
EXECUTE format('SELECT * FROM %I.%I WHERE %I <= %L FOR UPDATE NOWAIT'
, v_parent_schema
, v_child_table
, v_control
, v_child_min_id + (v_batch_interval_id * v_inner_loop_count));
v_lock_obtained := TRUE;
EXCEPTION
WHEN lock_not_available THEN
PERFORM pg_sleep( p_lock_wait / 5.0 );
CONTINUE;
END;
EXIT WHEN v_lock_obtained;
END LOOP;
IF NOT v_lock_obtained THEN
RAISE NOTICE 'Unable to obtain lock on batch of rows to move';
partitions_undone = -1;
RETURN;
END IF;
END IF;
-- Get everything from the current child minimum up to the multiples of the given interval
EXECUTE format('WITH move_data AS (
DELETE FROM %I.%I WHERE %s <= %L RETURNING %s)
INSERT INTO %I.%I (%5$s) SELECT %5$s FROM move_data'
, v_parent_schema
, v_child_table
, v_partition_expression
, v_child_min_id + (v_batch_interval_id * v_inner_loop_count)
, v_column_list
, v_target_schema
, v_target_tablename);
GET DIAGNOSTICS v_rowcount = ROW_COUNT;
v_total := v_total + v_rowcount;
v_child_loop_total := v_child_loop_total + v_rowcount;
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'OK', format('Moved %s rows to target table.', v_child_loop_total));
END IF;
EXIT inner_child_loop WHEN v_rowcount = 0; -- exit before loop incr if table is empty
v_inner_loop_count := v_inner_loop_count + 1;
v_batch_loop_count := v_batch_loop_count + 1;
-- Check again if table is empty and go to outer loop again to drop it if so
EXECUTE format('SELECT min(%s) FROM %I.%I', v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_id;
CONTINUE outer_child_loop WHEN v_child_min_id IS NULL;
END IF; -- end v_control_type check
EXIT outer_child_loop WHEN v_batch_loop_count >= p_loop_count; -- Exit outer FOR loop if p_loop_count is reached
END LOOP inner_child_loop;
END LOOP outer_child_loop;
SELECT partition_tablename INTO v_child_table FROM @extschema@.show_partitions(p_parent_table, 'ASC', TRUE) LIMIT 1;
IF v_child_table IS NULL THEN
DELETE FROM @extschema@.part_config WHERE parent_table = p_parent_table;
-- Check if any other config entries still have this template table and don't remove if so
-- Allows other sibling/parent tables to still keep using in case entire partition set isn't being undone
SELECT count(*) INTO v_template_siblings FROM @extschema@.part_config WHERE template_table = v_template_table;
SELECT n.nspname, c.relname
INTO v_template_schema, v_template_tablename
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = split_part(v_template_table, '.', 1)::name
AND c.relname = split_part(v_template_table, '.', 2)::name;
IF v_template_siblings = 0 AND v_template_tablename IS NOT NULL THEN
EXECUTE format('DROP TABLE IF EXISTS %I.%I', v_template_schema, v_template_tablename);
END IF;
IF v_jobmon_schema IS NOT NULL THEN
v_step_id := add_step(v_job_id, 'Removing config from pg_partman');
PERFORM update_step(v_step_id, 'OK', 'Done');
END IF;
END IF;
RAISE NOTICE 'Moved % row(s) to the target table. Removed % partitions.', v_total, v_undo_count;
IF v_jobmon_schema IS NOT NULL THEN
v_step_id := add_step(v_job_id, 'Final stats');
PERFORM update_step(v_step_id, 'OK', format('Moved %s row(s) to the target table. Removed %s partitions.', v_total, v_undo_count));
END IF;
IF v_jobmon_schema IS NOT NULL THEN
PERFORM close_job(v_job_id);
END IF;
EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_old_search_path, 'false');
partitions_undone := v_undo_count;
rows_undone := v_total;
EXCEPTION
WHEN OTHERS THEN
GET STACKED DIAGNOSTICS ex_message = MESSAGE_TEXT,
ex_context = PG_EXCEPTION_CONTEXT,
ex_detail = PG_EXCEPTION_DETAIL,
ex_hint = PG_EXCEPTION_HINT;
IF v_jobmon_schema IS NOT NULL THEN
IF v_job_id IS NULL THEN
EXECUTE format('SELECT %I.add_job(''PARTMAN UNDO PARTITIONING: %s'')', v_jobmon_schema, p_parent_table) INTO v_job_id;
EXECUTE format('SELECT %I.add_step(%s, ''EXCEPTION before job logging started'')', v_jobmon_schema, v_job_id, p_parent_table) INTO v_step_id;
ELSIF v_step_id IS NULL THEN
EXECUTE format('SELECT %I.add_step(%s, ''EXCEPTION before first step logged'')', v_jobmon_schema, v_job_id) INTO v_step_id;
END IF;
EXECUTE format('SELECT %I.update_step(%s, ''CRITICAL'', %L)', v_jobmon_schema, v_step_id, 'ERROR: '||coalesce(SQLERRM,'unknown'));
EXECUTE format('SELECT %I.fail_job(%s)', v_jobmon_schema, v_job_id);
END IF;
RAISE EXCEPTION '%
CONTEXT: %
DETAIL: %
HINT: %', ex_message, ex_context, ex_detail, ex_hint;
END
$$;
|