1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
|
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.
-- Get information about the materialization table and bucket width.
CREATE OR REPLACE FUNCTION _timescaledb_functions.get_materialization_info(
continuous_aggregate REGCLASS
) RETURNS RECORD AS
$body$
DECLARE
info RECORD;
BEGIN
SELECT mat_hypertable_id AS materialization_id,
bucket_width::interval AS bucket_width
INTO info
FROM _timescaledb_catalog.continuous_agg,
LATERAL _timescaledb_functions.cagg_get_bucket_function_info(mat_hypertable_id)
WHERE format('%I.%I', user_view_schema, user_view_name)::regclass = continuous_aggregate;
IF NOT FOUND THEN
RAISE '"%" is not a continuous aggregate', continuous_aggregate
USING ERRCODE = 'wrong_object_type';
END IF;
RETURN info;
END
$body$ LANGUAGE plpgsql
SET search_path = pg_catalog, pg_temp;
-- Get hypertable id for a hypertable and execute common checks to
-- avoid duplicating them in the overloaded functions below.
--
-- This function is part of the internal API and not intended for
-- public consumption.
CREATE OR REPLACE FUNCTION _timescaledb_functions.get_hypertable_id(
hypertable REGCLASS,
column_type REGTYPE
) RETURNS integer AS
$body$
DECLARE
info RECORD;
BEGIN
SELECT ht.id AS hypertable_id,
di.column_type::regtype,
EXISTS(SELECT FROM _timescaledb_catalog.continuous_agg where raw_hypertable_id = ht.id) AS has_cagg
INTO info
FROM _timescaledb_catalog.hypertable ht
JOIN _timescaledb_catalog.dimension di ON ht.id = di.hypertable_id
WHERE format('%I.%I', schema_name, table_name)::regclass = hypertable
AND di.interval_length IS NOT NULL;
IF info IS NULL THEN
RAISE EXCEPTION 'table "%" is not a hypertable', hypertable
USING ERRCODE = 'object_not_in_prerequisite_state';
END IF;
IF NOT info.has_cagg THEN
RAISE EXCEPTION 'hypertable "%" has no continuous aggregate', hypertable
USING HINT = 'Define a continuous aggregate for the hypertable to read invalidations.',
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
IF info.column_type <> get_hypertable_id.column_type THEN
RAISE EXCEPTION 'wrong column type for hypertable %', hypertable
USING HINT = format('hypertable type was "%s", but caller expected "%s"',
info.column_type, get_hypertable_id.column_type),
ERRCODE = 'datatype_mismatch';
END IF;
RETURN info.hypertable_id;
END
$body$ LANGUAGE plpgsql
SET search_path = pg_catalog, pg_temp;
-- Get hypertable invalidations ranges based on bucket size.
--
-- This will return a multirange for each bucket size passed in and a
-- token that can be used to accept the multirange.
--
-- Note that the token returned is not unique for each bucket size and
-- represents either the LSN or a Snapshot of what data was read to
-- produce the bucket ranges.
--
-- Currently, we only have support for timestamp with and without
-- timezone, but it is straightforward to add similar implementations
-- for integer types.
CREATE OR REPLACE FUNCTION _timescaledb_functions.get_hypertable_invalidations(
hypertable REGCLASS,
base TIMESTAMPTZ,
bucket_widths INTERVAL[]
) RETURNS TABLE (bucket_width INTERVAL, token TEXT, invalidations TSTZMULTIRANGE) AS
$body$
DECLARE
l_hypertable_id INTEGER := _timescaledb_functions.get_hypertable_id(hypertable, 'timestamptz'::regtype);
BEGIN
RETURN QUERY (
WITH
-- Collect ranges from the invalidation log and convert them
-- to correct type.
timestamps AS MATERIALIZED (
SELECT _timescaledb_functions.to_timestamp(lowest_modified_value) AS start,
_timescaledb_functions.to_timestamp(greatest_modified_value) AS finish
FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log
WHERE hypertable_id = l_hypertable_id
),
-- Since a range can end at the end of a bucket and this
-- range should not include the next bucket, we need to
-- subtract one microsecond before computing the bucket and
-- then add the width again.
ranges AS MATERIALIZED (
SELECT width,
tstzrange(@extschema@.time_bucket(width, start),
@extschema@.time_bucket(width, finish - '1 microsecond'::interval) + width) AS bucket
FROM timestamps CROSS JOIN UNNEST(bucket_widths) w(width)
)
SELECT width, pg_current_snapshot()::text, range_agg(bucket) ranges
FROM ranges GROUP BY width
);
END
$body$
LANGUAGE plpgsql
SET search_path = pg_catalog, pg_temp;
CREATE OR REPLACE FUNCTION _timescaledb_functions.get_hypertable_invalidations(
hypertable REGCLASS,
base TIMESTAMP,
bucket_widths INTERVAL[]
) RETURNS TABLE (bucket_width INTERVAL, token TEXT, invalidations TSMULTIRANGE) AS
$body$
DECLARE
l_hypertable_id INTEGER := _timescaledb_functions.get_hypertable_id(hypertable, 'timestamp'::regtype);
BEGIN
RETURN QUERY (
WITH
-- Collect ranges from the invalidation log and convert them
-- to correct type.
timestamps AS MATERIALIZED (
SELECT _timescaledb_functions.to_timestamp_without_timezone(lowest_modified_value) AS start,
_timescaledb_functions.to_timestamp_without_timezone(greatest_modified_value) AS finish
FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log
WHERE hypertable_id = l_hypertable_id
),
-- Compute bucket-aligned ranges from the ranges above. Since
-- a range can end at the end of a bucket and this range
-- should not include the next bucket, we need to subtract
-- one microsecond before computing the bucket and then add
-- the width again.
ranges AS MATERIALIZED (
SELECT width,
tsrange(@extschema@.time_bucket(width, start),
@extschema@.time_bucket(width, finish - '1 microsecond'::interval) + width) AS bucket
FROM timestamps CROSS JOIN UNNEST(bucket_widths) w(width)
)
SELECT width, pg_current_snapshot()::text, range_agg(bucket)
FROM ranges GROUP BY width
);
END
$body$
LANGUAGE plpgsql
SET search_path TO pg_catalog, pg_temp;
-- Add new invalidations to the materialization invalidation log.
--
-- This will add the range to the materialization invalidations for
-- the continuous aggregate. The range will automatically be "aligned"
-- to the bucket width to ensure that it covers all buckets that it
-- touches.
CREATE OR REPLACE PROCEDURE _timescaledb_functions.add_materialization_invalidations(
continuous_aggregate regclass,
invalidation tsrange
) AS
$body$
DECLARE
info RECORD := _timescaledb_functions.get_materialization_info(continuous_aggregate);
aligned TSRANGE := _timescaledb_functions.align_to_bucket(info.bucket_width, invalidation);
BEGIN
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
VALUES (info.materialization_id,
_timescaledb_functions.to_unix_microseconds(lower(aligned)),
_timescaledb_functions.to_unix_microseconds(upper(aligned)));
END
$body$
LANGUAGE plpgsql
SET search_path TO pg_catalog, pg_temp;
CREATE OR REPLACE PROCEDURE _timescaledb_functions.add_materialization_invalidations(
continuous_aggregate REGCLASS,
invalidation TSTZRANGE
) AS
$body$
DECLARE
info RECORD := _timescaledb_functions.get_materialization_info(continuous_aggregate);
aligned TSTZRANGE := _timescaledb_functions.align_to_bucket(info.bucket_width, invalidation);
BEGIN
INSERT INTO _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
VALUES (info.materialization_id,
_timescaledb_functions.to_unix_microseconds(lower(aligned)),
_timescaledb_functions.to_unix_microseconds(upper(aligned)));
END
$body$
LANGUAGE plpgsql
SET search_path TO pg_catalog, pg_temp;
-- Get raw ranges from the materialization invalidation log
--
-- This is a cleaned-up version of the timestamps, still in Unix
-- microseconds, with nulls for '-infinity' and '+infinity' and
-- invalid entries removed.
CREATE OR REPLACE FUNCTION _timescaledb_functions.get_raw_materialization_ranges(typ regtype)
RETURNS TABLE (materialization_id integer,
lowest_modified_value bigint,
greatest_modified_value bigint)
AS $$
WITH
min_max_values AS MATERIALIZED (
SELECT _timescaledb_functions.get_internal_time_min(typ) AS min,
_timescaledb_functions.get_internal_time_max(typ) AS max
)
SELECT materialization_id,
NULLIF(lowest_modified_value, min_max_values.min),
NULLIF(greatest_modified_value, min_max_values.max)
FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log, min_max_values
WHERE lowest_modified_value
BETWEEN min_max_values.min
AND min_max_values.max
AND greatest_modified_value
BETWEEN min_max_values.min
AND min_max_values.max
$$
LANGUAGE SQL
SET search_path TO pg_catalog, pg_temp;
-- Get materialization invalidations for a continuous aggregate.
--
-- Note that this will modify the materialization invalidation table
-- to be able to extract the restricted range of invalidations.
CREATE OR REPLACE FUNCTION _timescaledb_functions.get_materialization_invalidations(
continuous_aggregate REGCLASS,
restriction TSTZRANGE
) RETURNS TABLE (invalidations TSTZMULTIRANGE) AS
$body$
DECLARE
info RECORD := _timescaledb_functions.get_materialization_info(continuous_aggregate);
aligned TSTZRANGE := _timescaledb_functions.align_to_bucket(info.bucket_width, restriction);
BEGIN
-- Compute the multirange for the invalidations inside the
-- restriction passed down to the function and return the ranges.
RETURN QUERY
WITH
ranges AS (
SELECT materialization_id,
range_agg(_timescaledb_functions.make_multirange_from_internal_time(
null::tstzrange,
lowest_modified_value,
greatest_modified_value)) AS invals
FROM _timescaledb_functions.get_raw_materialization_ranges('timestamptz'::regtype)
GROUP BY materialization_id
)
SELECT range_agg(invals * multirange(aligned))
FROM ranges
WHERE invals && aligned
AND materialization_id = info.materialization_id;
END
$body$
LANGUAGE plpgsql
SET search_path TO pg_catalog, pg_temp;
CREATE OR REPLACE FUNCTION _timescaledb_functions.get_materialization_invalidations(
continuous_aggregate REGCLASS,
restriction TSRANGE
) RETURNS TABLE (invalidations TSMULTIRANGE) AS
$body$
DECLARE
info RECORD := _timescaledb_functions.get_materialization_info(continuous_aggregate);
aligned TSRANGE := _timescaledb_functions.align_to_bucket(info.bucket_width, restriction);
BEGIN
-- Compute the multirange for the invalidations inside the
-- restriction passed down to the function and return the ranges.
RETURN QUERY
WITH
ranges AS (
SELECT materialization_id,
range_agg(_timescaledb_functions.make_multirange_from_internal_time(
null::tsrange,
lowest_modified_value,
greatest_modified_value)) AS invals
FROM _timescaledb_functions.get_raw_materialization_ranges('timestamp'::regtype)
GROUP BY materialization_id
)
SELECT range_agg(invals * multirange(aligned))
FROM ranges
WHERE invals && aligned
AND materialization_id = info.materialization_id;
END
$body$ LANGUAGE plpgsql
SET search_path = pg_catalog, pg_temp;
-- Accept a set of hypertable invalidations for a hypertable.
--
-- This procedure is used to accept all invalidations for a hypertable
-- using the token returned by a previous call of
-- get_hypertable_invalidations().
CREATE OR REPLACE PROCEDURE _timescaledb_functions.accept_hypertable_invalidations(
hypertable REGCLASS,
token TEXT
) AS
$body$
DECLARE
info RECORD;
errmsg TEXT;
BEGIN
SELECT ht.id AS hypertable_id,
(cagg.raw_hypertable_id IS NOT NULL) AS has_cagg
INTO info
FROM _timescaledb_catalog.hypertable ht
LEFT JOIN _timescaledb_catalog.continuous_agg cagg ON cagg.raw_hypertable_id = ht.id
WHERE format('%I.%I', schema_name, table_name)::regclass = hypertable;
IF info IS NULL THEN
RAISE EXCEPTION 'table "%" is not a hypertable', hypertable
USING ERRCODE = 'object_not_in_prerequisite_state';
END IF;
IF NOT info.has_cagg THEN
RAISE EXCEPTION 'hypertable "%" has no continuous aggregate', hypertable
USING HINT = 'Define a continuous aggregate for the hypertable to handle invalidations.',
ERRCODE = 'object_not_in_prerequisite_state';
END IF;
DELETE FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log
WHERE hypertable_id = info.hypertable_id
AND pg_visible_in_snapshot(xmin::text::xid8, token::pg_snapshot);
EXCEPTION
WHEN invalid_text_representation THEN
RAISE EXCEPTION '%', SQLERRM
USING HINT = 'Use the token from the get_hypertable_invalidations() call.',
ERRCODE = 'invalid_text_representation';
END
$body$
LANGUAGE plpgsql
SET search_path TO pg_catalog, pg_temp;
CREATE OR REPLACE PROCEDURE _timescaledb_functions.process_hypertable_invalidations(
hypertable NAME
) LANGUAGE C
AS '@MODULE_PATHNAME@', 'ts_continuous_agg_process_hypertable_invalidations';
CREATE OR REPLACE PROCEDURE _timescaledb_functions.process_hypertable_invalidations(
hypertable REGCLASS
) LANGUAGE C
AS '@MODULE_PATHNAME@', 'ts_continuous_agg_process_hypertable_invalidations';
CREATE OR REPLACE PROCEDURE _timescaledb_functions.process_hypertable_invalidations(
hypertables REGCLASS[]
) LANGUAGE C
AS '@MODULE_PATHNAME@', 'ts_continuous_agg_process_hypertable_invalidations';
|