1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
|
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.
CREATE OR REPLACE PROCEDURE _timescaledb_functions.policy_retention(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_retention_proc'
LANGUAGE C;
CREATE OR REPLACE FUNCTION _timescaledb_functions.policy_retention_check(config JSONB)
RETURNS void AS '@MODULE_PATHNAME@', 'ts_policy_retention_check'
LANGUAGE C;
CREATE OR REPLACE PROCEDURE _timescaledb_functions.policy_reorder(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_reorder_proc'
LANGUAGE C;
CREATE OR REPLACE FUNCTION _timescaledb_functions.policy_reorder_check(config JSONB)
RETURNS void AS '@MODULE_PATHNAME@', 'ts_policy_reorder_check'
LANGUAGE C;
CREATE OR REPLACE PROCEDURE _timescaledb_functions.policy_recompression(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_recompression_proc'
LANGUAGE C;
CREATE OR REPLACE FUNCTION _timescaledb_functions.policy_compression_check(config JSONB)
RETURNS void AS '@MODULE_PATHNAME@', 'ts_policy_compression_check'
LANGUAGE C;
CREATE OR REPLACE PROCEDURE _timescaledb_functions.policy_refresh_continuous_aggregate(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_proc'
LANGUAGE C;
CREATE OR REPLACE FUNCTION _timescaledb_functions.policy_refresh_continuous_aggregate_check(config JSONB)
RETURNS void AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_check'
LANGUAGE C;
CREATE OR REPLACE PROCEDURE _timescaledb_functions.policy_process_hypertable_invalidations(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_process_hyper_inval_proc'
LANGUAGE C;
CREATE OR REPLACE FUNCTION _timescaledb_functions.policy_process_hypertable_invalidations_check(config JSONB)
RETURNS void AS '@MODULE_PATHNAME@', 'ts_policy_process_hyper_inval_check'
LANGUAGE C;
CREATE OR REPLACE PROCEDURE
_timescaledb_functions.policy_compression_execute(
job_id INTEGER,
htid INTEGER,
lag ANYELEMENT,
maxchunks INTEGER,
verbose_log BOOLEAN,
recompress_enabled BOOLEAN,
reindex_enabled BOOLEAN,
use_creation_time BOOLEAN
)
AS $$
DECLARE
htoid REGCLASS;
chunk_rec RECORD;
idx_rec RECORD;
numchunks_compressed INTEGER := 0;
_message text;
_detail text;
_sqlstate text;
-- fully compressed chunk status
status_fully_compressed int := 1;
-- chunk status bits:
bit_compressed int := 1;
bit_compressed_unordered int := 2;
bit_frozen int := 4;
bit_compressed_partial int := 8;
creation_lag INTERVAL := NULL;
chunks_failure INTEGER := 0;
BEGIN
-- procedures with SET clause cannot execute transaction
-- control so we adjust search_path in procedure body
SET LOCAL search_path TO pg_catalog, pg_temp;
SELECT format('%I.%I', schema_name, table_name) INTO htoid
FROM _timescaledb_catalog.hypertable
WHERE id = htid;
-- for the integer cases, we have to compute the lag w.r.t
-- the integer_now function and then pass on to show_chunks
IF pg_typeof(lag) IN ('BIGINT'::regtype, 'INTEGER'::regtype, 'SMALLINT'::regtype) THEN
-- cannot have use_creation_time set with this
IF use_creation_time IS TRUE THEN
RAISE EXCEPTION 'job % cannot use creation time with integer_now function', job_id;
END IF;
lag := _timescaledb_functions.subtract_integer_from_now(htoid, lag::BIGINT);
END IF;
-- if use_creation_time has been specified then the lag needs to be used with the
-- "compress_created_before" argument. Otherwise the usual "older_than" argument
-- is good enough
IF use_creation_time IS TRUE THEN
creation_lag := lag;
lag := NULL;
END IF;
FOR chunk_rec IN
SELECT
show.oid, ch.schema_name, ch.table_name, ch.status
FROM
@extschema@.show_chunks(htoid, older_than => lag, created_before => creation_lag) AS show(oid)
INNER JOIN pg_class pgc ON pgc.oid = show.oid
INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = htid
WHERE NOT ch.dropped
AND NOT ch.osm_chunk
-- Checking for chunks which are not fully compressed and not frozen
AND ch.status != status_fully_compressed
AND ch.status & bit_frozen = 0
LOOP
BEGIN
IF chunk_rec.status = bit_compressed OR recompress_enabled IS TRUE THEN
PERFORM @extschema@.compress_chunk(chunk_rec.oid);
numchunks_compressed := numchunks_compressed + 1;
END IF;
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_message = MESSAGE_TEXT,
_detail = PG_EXCEPTION_DETAIL,
_sqlstate = RETURNED_SQLSTATE;
RAISE WARNING 'converting chunk "%" to columnstore failed when recompress columnstore policy is executed', chunk_rec.oid::regclass::text
USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
ERRCODE = _sqlstate;
chunks_failure := chunks_failure + 1;
END;
COMMIT;
-- went through recompression successfully now reindex indexes
IF (chunk_rec.status & bit_compressed_partial = bit_compressed_partial) AND (reindex_enabled IS TRUE) THEN
FOR idx_rec IN
SELECT idx.schemaname, idx.indexname
FROM pg_indexes idx
JOIN _timescaledb_catalog.chunk ch ON ch.schema_name = idx.schemaname AND ch.table_name = idx.tablename
WHERE idx.schemaname = chunk_rec.schema_name
AND idx.tablename = chunk_rec.table_name
AND ch.status = status_fully_compressed
LOOP
BEGIN
EXECUTE format('REINDEX INDEX %I.%I;', idx_rec.schemaname, idx_rec.indexname);
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_message = MESSAGE_TEXT,
_detail = PG_EXCEPTION_DETAIL,
_sqlstate = RETURNED_SQLSTATE;
RAISE WARNING 'reindexing index "%.%" for chunk "%" to columnstore failed when columnstore policy is executed', idx_rec.schemaname, idx_rec.indexname, chunk_rec.oid::regclass::text
USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
ERRCODE = _sqlstate;
chunks_failure := chunks_failure + 1;
END;
COMMIT;
END LOOP;
END IF;
-- SET LOCAL is only active until end of transaction.
-- While we could use SET at the start of the function we do not
-- want to bleed out search_path to caller, so we do SET LOCAL
-- again after COMMIT
SET LOCAL search_path TO pg_catalog, pg_temp;
IF verbose_log THEN
RAISE LOG 'job % completed processing chunk %.%', job_id, chunk_rec.schema_name, chunk_rec.table_name;
END IF;
IF maxchunks > 0 AND numchunks_compressed >= maxchunks THEN
EXIT;
END IF;
END LOOP;
IF chunks_failure > 0 THEN
RAISE EXCEPTION 'columnstore policy failure'
USING
DETAIL = format('Failed to convert %L chunks to columnstore. Successfully converted %L chunks.', chunks_failure, numchunks_compressed),
ERRCODE = 'data_exception';
END IF;
END;
$$ LANGUAGE PLPGSQL;
CREATE OR REPLACE PROCEDURE
_timescaledb_functions.policy_compression(job_id INTEGER, config JSONB)
AS $$
DECLARE
dimtype REGTYPE;
dimtypeinput REGPROC;
compress_after TEXT;
compress_created_before TEXT;
lag_value TEXT;
lag_bigint_value BIGINT;
htid INTEGER;
htoid REGCLASS;
chunk_rec RECORD;
verbose_log BOOL;
maxchunks INTEGER := 0;
numchunks INTEGER := 1;
recompress_enabled BOOL;
reindex_enabled BOOL;
use_creation_time BOOL := FALSE;
BEGIN
-- procedures with SET clause cannot execute transaction
-- control so we adjust search_path in procedure body
SET LOCAL search_path TO pg_catalog, pg_temp;
IF config IS NULL THEN
RAISE EXCEPTION 'job % has null config', job_id;
END IF;
htid := jsonb_object_field_text(config, 'hypertable_id')::INTEGER;
IF htid is NULL THEN
RAISE EXCEPTION 'job % config must have hypertable_id', job_id;
END IF;
verbose_log := COALESCE(jsonb_object_field_text(config, 'verbose_log')::BOOLEAN, FALSE);
maxchunks := COALESCE(jsonb_object_field_text(config, 'maxchunks_to_compress')::INTEGER, 0);
recompress_enabled := COALESCE(jsonb_object_field_text(config, 'recompress')::BOOLEAN, TRUE);
reindex_enabled := COALESCE(jsonb_object_field_text(config, 'reindex')::BOOLEAN, TRUE);
-- find primary dimension type --
SELECT dim.column_type INTO dimtype
FROM _timescaledb_catalog.hypertable ht
JOIN _timescaledb_catalog.dimension dim ON ht.id = dim.hypertable_id
WHERE ht.id = htid
ORDER BY dim.id
LIMIT 1;
compress_after := jsonb_object_field_text(config, 'compress_after');
IF compress_after IS NULL THEN
compress_created_before := jsonb_object_field_text(config, 'compress_created_before');
IF compress_created_before IS NULL THEN
RAISE EXCEPTION 'job % config must have compress_after or compress_created_before', job_id;
END IF;
lag_value := compress_created_before;
use_creation_time := true;
dimtype := 'INTERVAL' ::regtype;
ELSE
lag_value := compress_after;
END IF;
-- execute the properly type casts for the lag value
CASE dimtype
WHEN 'TIMESTAMP'::regtype, 'TIMESTAMPTZ'::regtype, 'DATE'::regtype, 'INTERVAL' ::regtype, 'UUID'::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(job_id, htid, lag_value::INTERVAL, maxchunks, verbose_log, recompress_enabled, reindex_enabled, use_creation_time);
WHEN 'BIGINT'::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(job_id, htid, lag_value::BIGINT, maxchunks, verbose_log, recompress_enabled, reindex_enabled, use_creation_time);
WHEN 'INTEGER'::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(job_id, htid, lag_value::INTEGER, maxchunks, verbose_log, recompress_enabled, reindex_enabled, use_creation_time);
WHEN 'SMALLINT'::regtype THEN
CALL _timescaledb_functions.policy_compression_execute(job_id, htid, lag_value::SMALLINT, maxchunks, verbose_log, recompress_enabled, reindex_enabled, use_creation_time);
END CASE;
COMMIT;
END;
$$ LANGUAGE PLPGSQL;
|