
|
CREATE FUNCTION @extschema@.apply_constraints(
p_parent_table text
, p_child_table text DEFAULT NULL
, p_analyze boolean DEFAULT FALSE
, p_job_id bigint DEFAULT NULL
)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
ex_context text;
ex_detail text;
ex_hint text;
ex_message text;
v_child_exists text;
v_child_tablename text;
v_child_value text;
v_col text;
v_constraint_cols text[];
v_constraint_name text;
v_constraint_valid boolean;
v_constraint_values record;
v_control text;
v_control_type text;
v_datetime_string text;
v_epoch text;
v_existing_constraint_name text;
v_job_id bigint;
v_jobmon boolean;
v_jobmon_schema text;
v_last_partition text;
v_last_partition_id bigint;
v_last_partition_timestamp timestamptz;
v_new_search_path text;
v_old_search_path text;
v_optimize_constraint int;
v_optimize_counter int := 0;
v_row_max_value record;
v_parent_schema text;
v_parent_table text;
v_parent_tablename text;
v_partition_interval text;
v_partition_suffix text;
v_premake int;
v_sql text;
v_step_id bigint;
BEGIN
/*
* Apply constraints managed by partman extension
*/
SELECT parent_table
, control
, premake
, partition_interval
, optimize_constraint
, epoch
, datetime_string
, constraint_cols
, jobmon
, constraint_valid
INTO v_parent_table
, v_control
, v_premake
, v_partition_interval
, v_optimize_constraint
, v_epoch
, v_datetime_string
, v_constraint_cols
, v_jobmon
, v_constraint_valid
FROM @extschema@.part_config
WHERE parent_table = p_parent_table
AND constraint_cols IS NOT NULL;
IF v_constraint_cols IS NULL THEN
RAISE DEBUG 'apply_constraints: Given parent table (%) not set up for constraint management (constraint_cols is NULL)', p_parent_table;
-- Returns silently to allow this function to be simply called by maintenance processes without having to check if config options are set.
RETURN;
END IF;
SELECT schemaname, tablename
INTO v_parent_schema, v_parent_tablename
FROM pg_catalog.pg_tables
WHERE schemaname = split_part(v_parent_table, '.', 1)::name
AND tablename = split_part(v_parent_table, '.', 2)::name;
SELECT general_type INTO v_control_type FROM @extschema@.check_control_type(v_parent_schema, v_parent_tablename, v_control);
SELECT current_setting('search_path') INTO v_old_search_path;
IF length(v_old_search_path) > 0 THEN
v_new_search_path := '@extschema@,pg_temp,'||v_old_search_path;
ELSE
v_new_search_path := '@extschema@,pg_temp';
END IF;
IF v_jobmon THEN
SELECT nspname INTO v_jobmon_schema FROM pg_catalog.pg_namespace n, pg_catalog.pg_extension e WHERE e.extname = 'pg_jobmon'::name AND e.extnamespace = n.oid;
IF v_jobmon_schema IS NOT NULL THEN
v_new_search_path := format('%s,%s',v_jobmon_schema, v_new_search_path);
END IF;
END IF;
EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_new_search_path, 'false');
IF v_jobmon_schema IS NOT NULL THEN
IF p_job_id IS NULL THEN
v_job_id := add_job(format('PARTMAN CREATE CONSTRAINT: %s', v_parent_table));
ELSE
v_job_id = p_job_id;
END IF;
END IF;
-- If p_child_table is null, figure out the partition that is the one right before the optimize_constraint value backwards.
IF p_child_table IS NULL THEN
IF v_jobmon_schema IS NOT NULL THEN
v_step_id := add_step(v_job_id, 'Applying additional constraints: Automatically determining most recent child on which to apply constraints');
END IF;
-- Loop through child tables starting from highest to get a value from the highest non-empty partition in the set
-- Once a child table with a value is found, go back <optimize_constraint> + 1 children to make the constraint on that child
FOR v_row_max_value IN
SELECT partition_schemaname, partition_tablename FROM @extschema@.show_partitions(p_parent_table, 'DESC', false)
LOOP
IF v_child_value IS NULL THEN
EXECUTE format('SELECT %I::text FROM %I.%I LIMIT 1'
, v_control
, v_row_max_value.partition_schemaname
, v_row_max_value.partition_tablename
) INTO v_child_value;
ELSE
v_optimize_counter := v_optimize_counter + 1;
IF v_optimize_counter > v_optimize_constraint THEN
v_child_tablename := v_row_max_value.partition_tablename;
EXIT;
END IF;
END IF;
END LOOP;
RAISE DEBUG 'apply_constraint: v_parent_tablename: %, v_last_partition: %, v_child_tablename: %, v_optimize_counter: %'
, v_parent_tablename, v_last_partition, v_child_tablename, v_optimize_counter;
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'OK', format('Target child table: %s.%s', v_parent_schema, v_child_tablename));
END IF;
ELSE
v_child_tablename = split_part(p_child_table, '.', 2);
END IF;
IF v_jobmon_schema IS NOT NULL THEN
v_step_id := add_step(v_job_id, 'Applying additional constraints: Checking if target child table exists');
END IF;
SELECT tablename FROM pg_catalog.pg_tables INTO v_child_exists WHERE schemaname = v_parent_schema::name AND tablename = v_child_tablename::name;
IF v_child_exists IS NULL THEN
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'NOTICE', format('Target child table (%s) does not exist. Skipping constraint creation.', v_child_tablename));
IF p_job_id IS NULL THEN
PERFORM close_job(v_job_id);
END IF;
END IF;
RAISE DEBUG 'Target child table (%) does not exist. Skipping constraint creation.', v_child_tablename;
EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_old_search_path, 'false');
RETURN;
ELSE
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'OK', 'Done');
END IF;
END IF;
FOREACH v_col IN ARRAY v_constraint_cols
LOOP
SELECT con.conname
INTO v_existing_constraint_name
FROM pg_catalog.pg_constraint con
JOIN pg_class c ON c.oid = con.conrelid
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
JOIN pg_catalog.pg_attribute a ON con.conrelid = a.attrelid
WHERE c.relname = v_child_tablename::name
AND n.nspname = v_parent_schema::name
AND con.conname LIKE 'partmanconstr_%'
AND con.contype = 'c'
AND a.attname = v_col::name
AND ARRAY[a.attnum] OPERATOR(pg_catalog.<@) con.conkey
AND a.attisdropped = false;
IF v_jobmon_schema IS NOT NULL THEN
v_step_id := add_step(v_job_id, format('Applying additional constraints: Applying new constraint on column: %s', v_col));
END IF;
IF v_existing_constraint_name IS NOT NULL THEN
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'NOTICE', format('Partman managed constraint already exists on this table (%s) and column (%s). Skipping creation.', v_child_tablename, v_col));
END IF;
RAISE DEBUG 'Partman managed constraint already exists on this table (%) and column (%). Skipping creation.', v_child_tablename, v_col ;
CONTINUE;
END IF;
-- Ensure column name gets put on end of constraint name to help avoid naming conflicts
v_constraint_name := @extschema@.check_name_length('partmanconstr_'||v_child_tablename, p_suffix := '_'||v_col);
EXECUTE format('SELECT min(%I)::text AS min, max(%I)::text AS max FROM %I.%I', v_col, v_col, v_parent_schema, v_child_tablename) INTO v_constraint_values;
IF v_constraint_values IS NOT NULL THEN
v_sql := format('ALTER TABLE %I.%I ADD CONSTRAINT %I CHECK (%I >= %L AND %I <= %L)'
, v_parent_schema
, v_child_tablename
, v_constraint_name
, v_col
, v_constraint_values.min
, v_col
, v_constraint_values.max);
IF v_constraint_valid = false THEN
v_sql := format('%s NOT VALID', v_sql);
END IF;
RAISE DEBUG 'Constraint creation query: %', v_sql;
EXECUTE v_sql;
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'OK', format('New constraint created: %s', v_sql));
END IF;
ELSE
RAISE DEBUG 'Given column (%) contains all NULLs. No constraint created', v_col;
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'NOTICE', format('Given column (%s) contains all NULLs. No constraint created', v_col));
END IF;
END IF;
END LOOP;
IF p_analyze THEN
IF v_jobmon_schema IS NOT NULL THEN
v_step_id := add_step(v_job_id, format('Applying additional constraints: Running analyze on partition set: %s', v_parent_table));
END IF;
RAISE DEBUG 'Running analyze on partition set: %', v_parent_table;
EXECUTE format('ANALYZE %I.%I', v_parent_schema, v_parent_tablename);
IF v_jobmon_schema IS NOT NULL THEN
PERFORM update_step(v_step_id, 'OK', 'Done');
END IF;
END IF;
IF v_jobmon_schema IS NOT NULL THEN
PERFORM close_job(v_job_id);
END IF;
EXECUTE format('SELECT set_config(%L, %L, %L)', 'search_path', v_old_search_path, 'false');
EXCEPTION
WHEN OTHERS THEN
GET STACKED DIAGNOSTICS ex_message = MESSAGE_TEXT,
ex_context = PG_EXCEPTION_CONTEXT,
ex_detail = PG_EXCEPTION_DETAIL,
ex_hint = PG_EXCEPTION_HINT;
IF v_jobmon_schema IS NOT NULL THEN
IF v_job_id IS NULL THEN
EXECUTE format('SELECT %I.add_job(''PARTMAN CREATE CONSTRAINT: %s'')', v_jobmon_schema, p_parent_table) INTO v_job_id;
EXECUTE format('SELECT %I.add_step(%s, ''EXCEPTION before job logging started'')', v_jobmon_schema, v_job_id, p_parent_table) INTO v_step_id;
ELSIF v_step_id IS NULL THEN
EXECUTE format('SELECT %I.add_step(%s, ''EXCEPTION before first step logged'')', v_jobmon_schema, v_job_id) INTO v_step_id;
END IF;
EXECUTE format('SELECT %I.update_step(%s, ''CRITICAL'', %L)', v_jobmon_schema, v_step_id, 'ERROR: '||coalesce(SQLERRM,'unknown'));
EXECUTE format('SELECT %I.fail_job(%s)', v_jobmon_schema, v_job_id);
END IF;
RAISE EXCEPTION '%
CONTEXT: %
DETAIL: %
HINT: %', ex_message, ex_context, ex_detail, ex_hint;
END
$$;
|