1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493
|
CREATE FUNCTION @extschema@.undo_partition(
p_parent_table text
, p_target_table text
, p_loop_count int DEFAULT 1
, p_batch_interval text DEFAULT NULL
, p_keep_table boolean DEFAULT true
, p_lock_wait numeric DEFAULT 0
, p_ignored_columns text[] DEFAULT NULL
, p_drop_cascade boolean DEFAULT false
, OUT partitions_undone int
, OUT rows_undone bigint
)
RETURNS record
LANGUAGE plpgsql
AS $$
DECLARE
ex_context text;
ex_detail text;
ex_hint text;
ex_message text;
v_adv_lock boolean;
v_batch_interval_id bigint;
v_batch_interval_time interval;
v_batch_loop_count int := 0;
v_child_loop_total bigint := 0;
v_child_table text;
v_column_list text;
v_control text;
v_control_type text;
v_time_encoder text;
v_time_decoder text;
v_child_min_id bigint;
v_child_min_time timestamptz;
v_epoch text;
v_function_name text;
v_jobmon boolean;
v_jobmon_schema text;
v_job_id bigint;
v_inner_loop_count int;
v_lock_iter int := 1;
v_lock_obtained boolean := FALSE;
v_new_search_path text;
v_old_search_path text;
v_parent_schema text;
v_parent_tablename text;
v_partition_expression text;
v_partition_interval text;
v_row record;
v_rowcount bigint;
v_sql text;
v_step_id bigint;
v_sub_count int;
v_target_schema text;
v_target_tablename text;
v_template_schema text;
v_template_siblings int;
v_template_table text;
v_template_tablename text;
v_total bigint := 0;
v_trig_name text;
v_undo_count int := 0;
BEGIN
/*
* Moves data to new, target table since data cannot be moved elsewhere in the same partition set.
* Leaves old parent table as is and does not change name of new table.
*/
v_adv_lock := pg_try_advisory_xact_lock(hashtext('pg_partman undo_partition'));
IF v_adv_lock = 'false' THEN
RAISE NOTICE 'undo_partition already running.';
partitions_undone = -1;
RETURN;
END IF;
IF p_parent_table = p_target_table THEN
RAISE EXCEPTION 'Target table cannot be the same as the parent table';
END IF;
SELECT partition_interval::text
, control
, time_encoder
, time_decoder
, jobmon
, epoch
, template_table
INTO v_partition_interval
, v_control
, v_time_encoder
, v_time_decoder
, v_jobmon
, v_epoch
, v_template_table
FROM @extschema@.part_config
WHERE parent_table = p_parent_table;
IF v_control IS NULL THEN
RAISE EXCEPTION 'No configuration found for pg_partman for given parent table: %', p_parent_table;
END IF;
IF p_target_table IS NULL THEN
RAISE EXCEPTION 'The p_target_table option must be set when undoing a partitioned table';
END IF;
SELECT n.nspname, c.relname
INTO v_parent_schema, v_parent_tablename
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = split_part(p_parent_table, '.', 1)::name
AND c.relname = split_part(p_parent_table, '.', 2)::name;
IF v_parent_tablename IS NULL THEN
RAISE EXCEPTION 'Given parent table not found in system catalogs: %', p_parent_table;
END IF;
SELECT general_type INTO v_control_type FROM @extschema@.check_control_type(v_parent_schema, v_parent_tablename, v_control);
IF v_control_type IN ('time', 'text', 'uuid') OR (v_control_type = 'id' AND v_epoch <> 'none') THEN
IF p_batch_interval IS NULL THEN
v_batch_interval_time := v_partition_interval::interval;
ELSE
v_batch_interval_time := p_batch_interval::interval;
END IF;
ELSIF v_control_type = 'id' THEN
IF p_batch_interval IS NULL THEN
v_batch_interval_id := v_partition_interval::bigint;
ELSE
v_batch_interval_id := p_batch_interval::bigint;
END IF;
ELSE
RAISE EXCEPTION 'Data type of control column in given partition set must be either date/time or integer.';
END IF;
SELECT current_setting('search_path') INTO v_old_search_path;
IF length(v_old_search_path) > 0 THEN
v_new_search_path := '@extschema@,pg_temp,'||v_old_search_path;
ELSE
v_new_search_path := '@extschema@,pg_temp';
END IF;
IF v_jobmon THEN
SELECT nspname INTO v_jobmon_schema FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'pg_jobmon'::name AND e.extnamespace = n.oid;
IF v_jobmon_schema IS NOT NULL THEN
v_new_search_path := format('%s,%s',v_jobmon_schema, v_new_search_path);
END IF;
END IF;
EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_new_search_path, 'false');
-- Check if any child tables are themselves partitioned or part of an inheritance tree. Prevent undo at this level if so.
-- Need to lock child tables at all levels before multi-level undo can be performed safely.
FOR v_row IN
SELECT partition_schemaname, partition_tablename FROM @extschema@.show_partitions(p_parent_table)
LOOP
SELECT count(*) INTO v_sub_count
FROM pg_catalog.pg_inherits i
JOIN pg_catalog.pg_class c ON i.inhparent = c.oid
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE c.relname = v_row.partition_tablename::name
AND n.nspname = v_row.partition_schemaname::name;
IF v_sub_count > 0 THEN
RAISE EXCEPTION 'Child table for this parent has child table(s) itself (%). Run undo partitioning on this table to ensure all data is properly moved to target table', v_row.partition_schemaname||'.'||v_row.partition_tablename;
END IF;
END LOOP;
SELECT n.nspname, c.relname
INTO v_target_schema, v_target_tablename
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = split_part(p_target_table, '.', 1)::name
AND c.relname = split_part(p_target_table, '.', 2)::name;
IF v_target_tablename IS NULL THEN
RAISE EXCEPTION 'Given target table not found in system catalogs: %', p_target_table;
END IF;
IF v_jobmon_schema IS NOT NULL THEN
v_job_id := add_job(format('PARTMAN UNDO PARTITIONING: %s', p_parent_table));
v_step_id := add_step(v_job_id, format('Undoing partitioning for table %s', p_parent_table));
END IF;
v_partition_expression := CASE
WHEN v_epoch = 'seconds' THEN format('to_timestamp(%I)', v_control)
WHEN v_epoch = 'milliseconds' THEN format('to_timestamp((%I/1000)::float)', v_control)
WHEN v_epoch = 'microseconds' THEN format('to_timestamp((%I/1000000)::float)', v_control)
WHEN v_epoch = 'nanoseconds' THEN format('to_timestamp((%I/1000000000)::float)', v_control)
ELSE format('%I', v_control)
END;
-- Stops new time partitions from being made as well as stopping child tables from being dropped if they were configured with a retention period.
UPDATE @extschema@.part_config SET undo_in_progress = true WHERE parent_table = p_parent_table;
IF v_jobmon_schema IS NOT NULL THEN
IF (v_trig_name IS NOT NULL OR v_function_name IS NOT NULL) THEN
PERFORM update_step(v_step_id, 'OK', 'Stopped partition creation process. Removed trigger & trigger function');
ELSE
PERFORM update_step(v_step_id, 'OK', 'Stopped partition creation process.');
END IF;
END IF;
-- Generate column list to use in SELECT/INSERT statements below. Allows for exclusion of GENERATED (or any other desired) columns.
SELECT string_agg(quote_ident(attname), ',')
INTO v_column_list
FROM pg_catalog.pg_attribute a
JOIN pg_catalog.pg_class c ON a.attrelid = c.oid
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = v_target_schema
AND c.relname = v_target_tablename
AND a.attnum > 0
AND a.attisdropped = false
AND attname <> ALL(COALESCE(p_ignored_columns, ARRAY[]::text[]));
<<outer_child_loop>>
LOOP
-- Get ordered list of child table in set. Store in variable one at a time per loop until none are left or batch count is reached.
-- This easily allows it to loop over same child table until empty or move onto next child table after it's dropped
-- Include the default table to ensure all data there is removed as well
SELECT partition_tablename INTO v_child_table FROM @extschema@.show_partitions(p_parent_table, 'ASC', p_include_default := TRUE) LIMIT 1;
EXIT outer_child_loop WHEN v_child_table IS NULL;
IF v_jobmon_schema IS NOT NULL THEN
v_step_id := add_step(v_job_id, format('Removing child partition: %s.%s', v_parent_schema, v_child_table));
END IF;
IF v_control_type = 'time' OR (v_control_type = 'id' AND v_epoch <> 'none') THEN
EXECUTE format('SELECT min(%s) FROM %I.%I', v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_time;
ELSIF (v_control_type IN ('text', 'uuid')) THEN
--- This can pass NULL to decoder function
EXECUTE format('SELECT %s((SELECT min(%s::text) FROM %I.%I))', v_time_decoder, v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_time;
ELSIF v_control_type = 'id' THEN
EXECUTE format('SELECT min(%s) FROM %I.%I', v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_id;
END IF;
IF v_child_min_time IS NULL AND v_child_min_id IS NULL THEN
-- No rows left in this child table. Remove from partition set.
-- lockwait timeout for table drop
IF p_lock_wait > 0 THEN
v_lock_iter := 0;
WHILE v_lock_iter <= 5 LOOP
v_lock_iter := v_lock_iter + 1;
BEGIN
EXECUTE format('LOCK TABLE ONLY %I.%I IN ACCESS EXCLUSIVE MODE NOWAIT', v_parent_schema, v_child_table);
v_lock_obtained := TRUE;
EXCEPTION
WHEN lock_not_available THEN
PERFORM pg_sleep( p_lock_wait / 5.0 );
CONTINUE;
END;
EXIT WHEN v_lock_obtained;
END LOOP;
IF NOT v_lock_obtained THEN
RAISE NOTICE 'Unable to obtain lock on child table for removal from partition set';
partitions_undone = -1;
RETURN;
END IF;
END IF; -- END p_lock_wait IF
v_lock_obtained := FALSE; -- reset for reuse later
v_sql := format('ALTER TABLE %I.%I DETACH PARTITION %I.%I'
, v_parent_schema
, v_parent_tablename
, v_parent_schema
, v_child_table);
EXECUTE v_sql;
IF p_keep_table = false THEN
v_sql := 'DROP TABLE %I.%I';
IF p_drop_cascade THEN
v_sql := v_sql || ' CASCADE';
END IF;
EXECUTE format(v_sql, v_parent_schema, v_child_table);
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'OK', format('Child table DROPPED. Moved %s rows to target table', v_child_loop_total));
END IF;
ELSE
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'OK', format('Child table DETACHED/UNINHERITED from parent, not DROPPED. Moved %s rows to target table', v_child_loop_total));
END IF;
END IF;
v_undo_count := v_undo_count + 1;
EXIT outer_child_loop WHEN v_batch_loop_count >= p_loop_count; -- Exit outer FOR loop if p_loop_count is reached
CONTINUE outer_child_loop; -- skip data moving steps below
END IF;
v_inner_loop_count := 1;
v_child_loop_total := 0;
<<inner_child_loop>>
LOOP
IF v_control_type IN ('time', 'text', 'uuid') OR (v_control_type = 'id' AND v_epoch <> 'none') THEN
-- do some locking with timeout, if required
IF p_lock_wait > 0 THEN
v_lock_iter := 0;
WHILE v_lock_iter <= 5 LOOP
v_lock_iter := v_lock_iter + 1;
BEGIN
IF v_control_type = 'time' OR (v_control_type = 'id' AND v_epoch <> 'none') THEN
EXECUTE format('SELECT * FROM %I.%I WHERE %I <= %L FOR UPDATE NOWAIT'
, v_parent_schema
, v_child_table
, v_control
, v_child_min_time + (v_batch_interval_time * v_inner_loop_count));
ELSIF (v_control_type IN ('text', 'uuid')) THEN
EXECUTE format('SELECT * FROM %I.%I WHERE %I <= %s(%L) FOR UPDATE NOWAIT'
, v_parent_schema
, v_child_table
, v_control
, v_time_encoder
, v_child_min_time + (v_batch_interval_time * v_inner_loop_count));
END IF;
v_lock_obtained := TRUE;
EXCEPTION
WHEN lock_not_available THEN
PERFORM pg_sleep( p_lock_wait / 5.0 );
CONTINUE;
END;
EXIT WHEN v_lock_obtained;
END LOOP;
IF NOT v_lock_obtained THEN
RAISE NOTICE 'Unable to obtain lock on batch of rows to move';
partitions_undone = -1;
RETURN;
END IF;
END IF;
-- Get everything from the current child minimum up to the multiples of the given interval
IF v_control_type = 'time' OR (v_control_type = 'id' AND v_epoch <> 'none') THEN
EXECUTE format('WITH move_data AS (
DELETE FROM %I.%I WHERE %s <= %L RETURNING %s )
INSERT INTO %I.%I (%5$s) SELECT %5$s FROM move_data'
, v_parent_schema
, v_child_table
, v_partition_expression
, v_child_min_time + (v_batch_interval_time * v_inner_loop_count)
, v_column_list
, v_target_schema
, v_target_tablename);
ELSIF (v_control_type IN ('text', 'uuid')) THEN
EXECUTE format('WITH move_data AS (
DELETE FROM %I.%I WHERE %s <= %s(%L) RETURNING %s )
INSERT INTO %I.%I (%6$s) SELECT %6$s FROM move_data'
, v_parent_schema
, v_child_table
, v_partition_expression
, v_time_encoder
, v_child_min_time + (v_batch_interval_time * v_inner_loop_count)
, v_column_list
, v_target_schema
, v_target_tablename);
END IF;
GET DIAGNOSTICS v_rowcount = ROW_COUNT;
v_total := v_total + v_rowcount;
v_child_loop_total := v_child_loop_total + v_rowcount;
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'OK', format('Moved %s rows to target table.', v_child_loop_total));
END IF;
EXIT inner_child_loop WHEN v_rowcount = 0; -- exit before loop incr if table is empty
v_inner_loop_count := v_inner_loop_count + 1;
v_batch_loop_count := v_batch_loop_count + 1;
-- Check again if table is empty and go to outer loop again to drop it if so
IF v_control_type = 'time' OR (v_control_type = 'id' AND v_epoch <> 'none') THEN
EXECUTE format('SELECT min(%s) FROM %I.%I', v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_time;
ELSIF (v_control_type IN ('text', 'uuid')) THEN
EXECUTE format('SELECT %s((SELECT min(%s::text) FROM %I.%I))', v_time_decoder, v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_time;
END IF;
CONTINUE outer_child_loop WHEN v_child_min_time IS NULL;
ELSIF v_control_type = 'id' THEN
IF p_lock_wait > 0 THEN
v_lock_iter := 0;
WHILE v_lock_iter <= 5 LOOP
v_lock_iter := v_lock_iter + 1;
BEGIN
EXECUTE format('SELECT * FROM %I.%I WHERE %I <= %L FOR UPDATE NOWAIT'
, v_parent_schema
, v_child_table
, v_control
, v_child_min_id + (v_batch_interval_id * v_inner_loop_count));
v_lock_obtained := TRUE;
EXCEPTION
WHEN lock_not_available THEN
PERFORM pg_sleep( p_lock_wait / 5.0 );
CONTINUE;
END;
EXIT WHEN v_lock_obtained;
END LOOP;
IF NOT v_lock_obtained THEN
RAISE NOTICE 'Unable to obtain lock on batch of rows to move';
partitions_undone = -1;
RETURN;
END IF;
END IF;
-- Get everything from the current child minimum up to the multiples of the given interval
EXECUTE format('WITH move_data AS (
DELETE FROM %I.%I WHERE %s <= %L RETURNING %s)
INSERT INTO %I.%I (%5$s) SELECT %5$s FROM move_data'
, v_parent_schema
, v_child_table
, v_partition_expression
, v_child_min_id + (v_batch_interval_id * v_inner_loop_count)
, v_column_list
, v_target_schema
, v_target_tablename);
GET DIAGNOSTICS v_rowcount = ROW_COUNT;
v_total := v_total + v_rowcount;
v_child_loop_total := v_child_loop_total + v_rowcount;
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'OK', format('Moved %s rows to target table.', v_child_loop_total));
END IF;
EXIT inner_child_loop WHEN v_rowcount = 0; -- exit before loop incr if table is empty
v_inner_loop_count := v_inner_loop_count + 1;
v_batch_loop_count := v_batch_loop_count + 1;
-- Check again if table is empty and go to outer loop again to drop it if so
EXECUTE format('SELECT min(%s) FROM %I.%I', v_partition_expression, v_parent_schema, v_child_table) INTO v_child_min_id;
CONTINUE outer_child_loop WHEN v_child_min_id IS NULL;
END IF; -- end v_control_type check
EXIT outer_child_loop WHEN v_batch_loop_count >= p_loop_count; -- Exit outer FOR loop if p_loop_count is reached
END LOOP inner_child_loop;
END LOOP outer_child_loop;
SELECT partition_tablename INTO v_child_table FROM @extschema@.show_partitions(p_parent_table, 'ASC', TRUE) LIMIT 1;
IF v_child_table IS NULL THEN
DELETE FROM @extschema@.part_config WHERE parent_table = p_parent_table;
-- Check if any other config entries still have this template table and don't remove if so
-- Allows other sibling/parent tables to still keep using in case entire partition set isn't being undone
SELECT count(*) INTO v_template_siblings FROM @extschema@.part_config WHERE template_table = v_template_table;
SELECT n.nspname, c.relname
INTO v_template_schema, v_template_tablename
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = split_part(v_template_table, '.', 1)::name
AND c.relname = split_part(v_template_table, '.', 2)::name;
IF v_template_siblings = 0 AND v_template_tablename IS NOT NULL THEN
EXECUTE format('DROP TABLE IF EXISTS %I.%I', v_template_schema, v_template_tablename);
END IF;
IF v_jobmon_schema IS NOT NULL THEN
v_step_id := add_step(v_job_id, 'Removing config from pg_partman');
PERFORM update_step(v_step_id, 'OK', 'Done');
END IF;
END IF;
RAISE NOTICE 'Moved % row(s) to the target table. Removed % partitions.', v_total, v_undo_count;
IF v_jobmon_schema IS NOT NULL THEN
v_step_id := add_step(v_job_id, 'Final stats');
PERFORM update_step(v_step_id, 'OK', format('Moved %s row(s) to the target table. Removed %s partitions.', v_total, v_undo_count));
END IF;
IF v_jobmon_schema IS NOT NULL THEN
PERFORM close_job(v_job_id);
END IF;
EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_old_search_path, 'false');
partitions_undone := v_undo_count;
rows_undone := v_total;
EXCEPTION
WHEN OTHERS THEN
GET STACKED DIAGNOSTICS ex_message = MESSAGE_TEXT,
ex_context = PG_EXCEPTION_CONTEXT,
ex_detail = PG_EXCEPTION_DETAIL,
ex_hint = PG_EXCEPTION_HINT;
IF v_jobmon_schema IS NOT NULL THEN
IF v_job_id IS NULL THEN
EXECUTE format('SELECT %I.add_job(''PARTMAN UNDO PARTITIONING: %s'')', v_jobmon_schema, p_parent_table) INTO v_job_id;
EXECUTE format('SELECT %I.add_step(%s, ''EXCEPTION before job logging started'')', v_jobmon_schema, v_job_id, p_parent_table) INTO v_step_id;
ELSIF v_step_id IS NULL THEN
EXECUTE format('SELECT %I.add_step(%s, ''EXCEPTION before first step logged'')', v_jobmon_schema, v_job_id) INTO v_step_id;
END IF;
EXECUTE format('SELECT %I.update_step(%s, ''CRITICAL'', %L)', v_jobmon_schema, v_step_id, 'ERROR: '||coalesce(SQLERRM,'unknown'));
EXECUTE format('SELECT %I.fail_job(%s)', v_jobmon_schema, v_job_id);
END IF;
RAISE EXCEPTION '%
CONTEXT: %
DETAIL: %
HINT: %', ex_message, ex_context, ex_detail, ex_hint;
END
$$;
|