1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
CREATE PROCEDURE @extschema@.partition_data_proc (
p_parent_table text
, p_loop_count int DEFAULT NULL
, p_interval text DEFAULT NULL
, p_lock_wait int DEFAULT 0
, p_lock_wait_tries int DEFAULT 10
, p_wait int DEFAULT 1
, p_order text DEFAULT 'ASC'
, p_source_table text DEFAULT NULL
, p_ignored_columns text[] DEFAULT NULL
, p_quiet boolean DEFAULT false
)
LANGUAGE plpgsql
AS $$
DECLARE
v_adv_lock boolean;
v_control text;
v_control_type text;
v_epoch text;
v_is_autovac_off boolean := false;
v_lockwait_count int := 0;
v_loop_count int := 0;
v_parent_schemaname text;
v_parent_tablename text;
v_rows_moved bigint;
v_source_schemaname text;
v_source_tablename text;
v_sql text;
v_total bigint := 0;
BEGIN
v_adv_lock := pg_try_advisory_xact_lock(hashtext('pg_partman partition_data_proc'), hashtext(p_parent_table));
IF v_adv_lock = 'false' THEN
RAISE NOTICE 'Partman partition_data_proc already running for given parent table: %.', p_parent_table;
RETURN;
END IF;
SELECT control, epoch
INTO v_control, v_epoch
FROM @extschema@.part_config
WHERE parent_table = p_parent_table;
IF NOT FOUND THEN
RAISE EXCEPTION 'ERROR: No entry in part_config found for given table: %', p_parent_table;
END IF;
SELECT n.nspname, c.relname INTO v_parent_schemaname, v_parent_tablename
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = split_part(p_parent_table, '.', 1)::name
AND c.relname = split_part(p_parent_table, '.', 2)::name;
IF v_parent_tablename IS NULL THEN
RAISE EXCEPTION 'Unable to find given parent table in system catalogs. Ensure it is schema qualified: %', p_parent_table;
END IF;
IF p_source_table IS NOT NULL THEN
SELECT n.nspname, c.relname INTO v_source_schemaname, v_source_tablename
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = split_part(p_source_table, '.', 1)::name
AND c.relname = split_part(p_source_table, '.', 2)::name;
IF v_source_tablename IS NULL THEN
RAISE EXCEPTION 'Unable to find given source table in system catalogs. Ensure it is schema qualified: %', p_source_table;
END IF;
END IF;
SELECT general_type INTO v_control_type FROM @extschema@.check_control_type(v_parent_schemaname, v_parent_tablename, v_control);
IF v_control_type = 'id' AND v_epoch <> 'none' THEN
v_control_type := 'time';
END IF;
/*
-- Currently no way to catch exception and reset autovac settings back to normal. Until I can do that, leaving this feature out for now
-- Leaving the functions to turn off/reset in to let people do that manually if desired
IF p_autovacuum_on = false THEN -- Add this parameter back to definition when this is working
-- Turn off autovac for parent, source table if set, and all child tables
v_is_autovac_off := @extschema@.autovacuum_off(v_parent_schemaname, v_parent_tablename, v_source_schema, v_source_tablename);
COMMIT;
END IF;
*/
v_sql := format('SELECT %s.partition_data_%s (p_parent_table := %L
, p_lock_wait := %L
, p_order := %L
, p_analyze := false'
, '@extschema@', v_control_type, p_parent_table, p_lock_wait, p_order);
IF p_interval IS NOT NULL THEN
v_sql := v_sql || format(', p_batch_interval := %L', p_interval);
END IF;
IF p_source_table IS NOT NULL THEN
v_sql := v_sql || format(', p_source_table := %L', p_source_table);
END IF;
IF p_ignored_columns IS NOT NULL THEN
v_sql := v_sql || format(', p_ignored_columns := %L', p_ignored_columns);
END IF;
v_sql := v_sql || ')';
RAISE DEBUG 'partition_data sql: %', v_sql;
LOOP
EXECUTE v_sql INTO v_rows_moved;
-- If lock wait timeout, do not increment the counter
IF v_rows_moved != -1 THEN
v_loop_count := v_loop_count + 1;
v_total := v_total + v_rows_moved;
v_lockwait_count := 0;
ELSE
v_lockwait_count := v_lockwait_count + 1;
IF v_lockwait_count > p_lock_wait_tries THEN
RAISE EXCEPTION 'Quitting due to inability to get lock on next batch of rows to be moved';
END IF;
END IF;
IF p_quiet = false THEN
IF v_rows_moved > 0 THEN
RAISE NOTICE 'Loop: %, Rows moved: %', v_loop_count, v_rows_moved;
ELSIF v_rows_moved = -1 THEN
RAISE NOTICE 'Unable to obtain row locks for data to be moved. Trying again...';
END IF;
END IF;
-- If no rows left or given loop argument limit is reached
IF v_rows_moved = 0 OR (p_loop_count > 0 AND v_loop_count >= p_loop_count) THEN
EXIT;
END IF;
COMMIT;
PERFORM pg_sleep(p_wait);
RAISE DEBUG 'v_rows_moved: %, v_loop_count: %, v_total: %, v_lockwait_count: %, p_wait: %', p_wait, v_rows_moved, v_loop_count, v_total, v_lockwait_count;
END LOOP;
/*
IF v_is_autovac_off = true THEN
-- Reset autovac back to default if it was turned off by this procedure
PERFORM @extschema@.autovacuum_reset(v_parent_schemaname, v_parent_tablename, v_source_schema, v_source_tablename);
COMMIT;
END IF;
*/
IF p_quiet = false THEN
RAISE NOTICE 'Total rows moved: %', v_total;
END IF;
RAISE NOTICE 'Ensure to VACUUM ANALYZE the parent (and source table if used) after partitioning data';
/* Leaving here until I can figure out what's wrong with procedures and exception handling
EXCEPTION
WHEN QUERY_CANCELED THEN
ROLLBACK;
-- Reset autovac back to default if it was turned off by this procedure
IF v_is_autovac_off = true THEN
PERFORM @extschema@.autovacuum_reset(v_parent_schema, v_parent_tablename, v_source_schema, v_source_tablename);
END IF;
RAISE EXCEPTION '%', SQLERRM;
WHEN OTHERS THEN
ROLLBACK;
-- Reset autovac back to default if it was turned off by this procedure
IF v_is_autovac_off = true THEN
PERFORM @extschema@.autovacuum_reset(v_parent_schema, v_parent_tablename, v_source_schema, v_source_tablename);
END IF;
RAISE EXCEPTION '%', SQLERRM;
*/
END;
$$;
|