1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
|
CREATE FUNCTION @extschema@.apply_constraints(
p_parent_table text
, p_child_table text DEFAULT NULL
, p_analyze boolean DEFAULT FALSE
, p_job_id bigint DEFAULT NULL
)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
ex_context text;
ex_detail text;
ex_hint text;
ex_message text;
v_child_exists text;
v_child_tablename text;
v_child_value text;
v_col text;
v_constraint_cols text[];
v_constraint_name text;
v_constraint_valid boolean;
v_constraint_values record;
v_control text;
v_control_type text;
v_datetime_string text;
v_epoch text;
v_existing_constraint_name text;
v_job_id bigint;
v_jobmon boolean;
v_jobmon_schema text;
v_last_partition text;
v_last_partition_id bigint;
v_last_partition_timestamp timestamptz;
v_new_search_path text;
v_old_search_path text;
v_optimize_constraint int;
v_optimize_counter int := 0;
v_row_max_value record;
v_parent_schema text;
v_parent_table text;
v_parent_tablename text;
v_partition_interval text;
v_partition_suffix text;
v_premake int;
v_sql text;
v_step_id bigint;
BEGIN
/*
* Apply constraints managed by partman extension
*/
SELECT parent_table
, control
, premake
, partition_interval
, optimize_constraint
, epoch
, datetime_string
, constraint_cols
, jobmon
, constraint_valid
INTO v_parent_table
, v_control
, v_premake
, v_partition_interval
, v_optimize_constraint
, v_epoch
, v_datetime_string
, v_constraint_cols
, v_jobmon
, v_constraint_valid
FROM @extschema@.part_config
WHERE parent_table = p_parent_table
AND constraint_cols IS NOT NULL;
IF v_constraint_cols IS NULL THEN
RAISE DEBUG 'apply_constraints: Given parent table (%) not set up for constraint management (constraint_cols is NULL)', p_parent_table;
-- Returns silently to allow this function to be simply called by maintenance processes without having to check if config options are set.
RETURN;
END IF;
SELECT schemaname, tablename
INTO v_parent_schema, v_parent_tablename
FROM pg_catalog.pg_tables
WHERE schemaname = split_part(v_parent_table, '.', 1)::name
AND tablename = split_part(v_parent_table, '.', 2)::name;
SELECT general_type INTO v_control_type FROM @extschema@.check_control_type(v_parent_schema, v_parent_tablename, v_control);
SELECT current_setting('search_path') INTO v_old_search_path;
IF length(v_old_search_path) > 0 THEN
v_new_search_path := '@extschema@,pg_temp,'||v_old_search_path;
ELSE
v_new_search_path := '@extschema@,pg_temp';
END IF;
IF v_jobmon THEN
SELECT nspname INTO v_jobmon_schema FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'pg_jobmon'::name AND e.extnamespace = n.oid;
IF v_jobmon_schema IS NOT NULL THEN
v_new_search_path := format('%s,%s',v_jobmon_schema, v_new_search_path);
END IF;
END IF;
EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_new_search_path, 'false');
IF v_jobmon_schema IS NOT NULL THEN
IF p_job_id IS NULL THEN
v_job_id := add_job(format('PARTMAN CREATE CONSTRAINT: %s', v_parent_table));
ELSE
v_job_id = p_job_id;
END IF;
END IF;
-- If p_child_table is null, figure out the partition that is the one right before the optimize_constraint value backwards.
IF p_child_table IS NULL THEN
IF v_jobmon_schema IS NOT NULL THEN
v_step_id := add_step(v_job_id, 'Applying additional constraints: Automatically determining most recent child on which to apply constraints');
END IF;
-- Loop through child tables starting from highest to get a value from the highest non-empty partition in the set
-- Once a child table with a value is found, go back <optimize_constraint> + 1 children to make the constraint on that child
FOR v_row_max_value IN
SELECT partition_schemaname, partition_tablename FROM @extschema@.show_partitions(p_parent_table, 'DESC', false)
LOOP
IF v_child_value IS NULL THEN
EXECUTE format('SELECT %I::text FROM %I.%I LIMIT 1'
, v_control
, v_row_max_value.partition_schemaname
, v_row_max_value.partition_tablename
) INTO v_child_value;
ELSE
v_optimize_counter := v_optimize_counter + 1;
IF v_optimize_counter > v_optimize_constraint THEN
v_child_tablename := v_row_max_value.partition_tablename;
EXIT;
END IF;
END IF;
END LOOP;
RAISE DEBUG 'apply_constraint: v_parent_tablename: %, v_last_partition: %, v_child_tablename: %, v_optimize_counter: %'
, v_parent_tablename, v_last_partition, v_child_tablename, v_optimize_counter;
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'OK', format('Target child table: %s.%s', v_parent_schema, v_child_tablename));
END IF;
ELSE
v_child_tablename = split_part(p_child_table, '.', 2);
END IF;
IF v_jobmon_schema IS NOT NULL THEN
v_step_id := add_step(v_job_id, 'Applying additional constraints: Checking if target child table exists');
END IF;
SELECT tablename FROM pg_catalog.pg_tables INTO v_child_exists WHERE schemaname = v_parent_schema::name AND tablename = v_child_tablename::name;
IF v_child_exists IS NULL THEN
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'NOTICE', format('Target child table (%s) does not exist. Skipping constraint creation.', v_child_tablename));
IF p_job_id IS NULL THEN
PERFORM close_job(v_job_id);
END IF;
END IF;
RAISE DEBUG 'Target child table (%) does not exist. Skipping constraint creation.', v_child_tablename;
EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_old_search_path, 'false');
RETURN;
ELSE
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'OK', 'Done');
END IF;
END IF;
FOREACH v_col IN ARRAY v_constraint_cols
LOOP
SELECT con.conname
INTO v_existing_constraint_name
FROM pg_catalog.pg_constraint con
JOIN pg_class c ON c.oid = con.conrelid
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
JOIN pg_catalog.pg_attribute a ON con.conrelid = a.attrelid
WHERE c.relname = v_child_tablename::name
AND n.nspname = v_parent_schema::name
AND con.conname LIKE 'partmanconstr_%'
AND con.contype = 'c'
AND a.attname = v_col::name
AND ARRAY[a.attnum] OPERATOR(pg_catalog.<@) con.conkey
AND a.attisdropped = false;
IF v_jobmon_schema IS NOT NULL THEN
v_step_id := add_step(v_job_id, format('Applying additional constraints: Applying new constraint on column: %s', v_col));
END IF;
IF v_existing_constraint_name IS NOT NULL THEN
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'NOTICE', format('Partman managed constraint already exists on this table (%s) and column (%s). Skipping creation.', v_child_tablename, v_col));
END IF;
RAISE DEBUG 'Partman managed constraint already exists on this table (%) and column (%). Skipping creation.', v_child_tablename, v_col ;
CONTINUE;
END IF;
-- Ensure column name gets put on end of constraint name to help avoid naming conflicts
v_constraint_name := @extschema@.check_name_length('partmanconstr_'||v_child_tablename, p_suffix := '_'||v_col);
EXECUTE format('SELECT min(%I)::text AS min, max(%I)::text AS max FROM %I.%I', v_col, v_col, v_parent_schema, v_child_tablename) INTO v_constraint_values;
IF v_constraint_values IS NOT NULL THEN
v_sql := format('ALTER TABLE %I.%I ADD CONSTRAINT %I CHECK (%I >= %L AND %I <= %L)'
, v_parent_schema
, v_child_tablename
, v_constraint_name
, v_col
, v_constraint_values.min
, v_col
, v_constraint_values.max);
IF v_constraint_valid = false THEN
v_sql := format('%s NOT VALID', v_sql);
END IF;
RAISE DEBUG 'Constraint creation query: %', v_sql;
EXECUTE v_sql;
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'OK', format('New constraint created: %s', v_sql));
END IF;
ELSE
RAISE DEBUG 'Given column (%) contains all NULLs. No constraint created', v_col;
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'NOTICE', format('Given column (%s) contains all NULLs. No constraint created', v_col));
END IF;
END IF;
END LOOP;
IF p_analyze THEN
IF v_jobmon_schema IS NOT NULL THEN
v_step_id := add_step(v_job_id, format('Applying additional constraints: Running analyze on partition set: %s', v_parent_table));
END IF;
RAISE DEBUG 'Running analyze on partition set: %', v_parent_table;
EXECUTE format('ANALYZE %I.%I', v_parent_schema, v_parent_tablename);
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'OK', 'Done');
END IF;
END IF;
IF v_jobmon_schema IS NOT NULL THEN
PERFORM close_job(v_job_id);
END IF;
EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_old_search_path, 'false');
EXCEPTION
WHEN OTHERS THEN
GET STACKED DIAGNOSTICS ex_message = MESSAGE_TEXT,
ex_context = PG_EXCEPTION_CONTEXT,
ex_detail = PG_EXCEPTION_DETAIL,
ex_hint = PG_EXCEPTION_HINT;
IF v_jobmon_schema IS NOT NULL THEN
IF v_job_id IS NULL THEN
EXECUTE format('SELECT %I.add_job(''PARTMAN CREATE CONSTRAINT: %s'')', v_jobmon_schema, p_parent_table) INTO v_job_id;
EXECUTE format('SELECT %I.add_step(%s, ''EXCEPTION before job logging started'')', v_jobmon_schema, v_job_id, p_parent_table) INTO v_step_id;
ELSIF v_step_id IS NULL THEN
EXECUTE format('SELECT %I.add_step(%s, ''EXCEPTION before first step logged'')', v_jobmon_schema, v_job_id) INTO v_step_id;
END IF;
EXECUTE format('SELECT %I.update_step(%s, ''CRITICAL'', %L)', v_jobmon_schema, v_step_id, 'ERROR: '||coalesce(SQLERRM,'unknown'));
EXECUTE format('SELECT %I.fail_job(%s)', v_jobmon_schema, v_job_id);
END IF;
RAISE EXCEPTION '%
CONTEXT: %
DETAIL: %
HINT: %', ex_message, ex_context, ex_detail, ex_hint;
END
$$;
|