1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
|
/* pg_squeeze--1.2--1.3.sql */
-- complain if script is sourced in psql, rather than via ALTER EXTENSION
\echo Use "ALTER EXTENSION pg_squeeze UPDATE TO '1.3'" to load this file. \quit
ALTER TABLE tables_internal DROP COLUMN class_id;
ALTER TABLE tables_internal DROP COLUMN class_id_toast;
ALTER TABLE tables_internal DROP COLUMN free_space;
ALTER TABLE tables_internal DROP COLUMN last_task_created;
DELETE FROM tables;
ALTER TABLE tables DROP COLUMN schedule;
CREATE DOMAIN minute AS int CHECK (VALUE BETWEEN 0 AND 59);
CREATE DOMAIN hour AS int CHECK (VALUE BETWEEN 0 AND 23);
CREATE DOMAIN dom AS int CHECK (VALUE BETWEEN 1 AND 31);
CREATE DOMAIN month AS int CHECK (VALUE BETWEEN 1 AND 12);
CREATE DOMAIN dow AS int CHECK (VALUE BETWEEN 0 AND 7);
CREATE TYPE schedule AS (
minutes minute[],
hours hour[],
days_of_month dom[],
months month[],
days_of_week dow[]);
ALTER TABLE tables ADD COLUMN schedule schedule NOT NULL;
DROP TABLE tasks;
CREATE DOMAIN task_state AS TEXT CHECK(VALUE IN ('new', 'ready', 'processed'));
CREATE TABLE tasks (
id serial NOT NULL PRIMARY KEY,
table_id int NOT NULL REFERENCES tables
ON DELETE CASCADE,
-- Task creation time.
created timestamptz NOT NULL DEFAULT now(),
-- The latest known free space in the underlying table. Note that it
-- includes dead tuples, since these are eliminated by squeeze_table()
-- function.
free_space double precision,
-- How many times did we try to process the task? The common use case
-- is that a concurrent DDL broke the processing.
tried int NOT NULL DEFAULT 0,
-- Either squeezed or skipped by the "squeeze worker" (because there's
-- not enough free space or the table is not big enough).
state task_state NOT NULL DEFAULT 'new'
);
DROP FUNCTION add_new_tasks();
DROP FUNCTION start_next_task();
DROP FUNCTION cleanup_task(a_task_id int);
DROP FUNCTION process_current_task();
DROP FUNCTION stop_worker();
CREATE VIEW scheduled_for_now AS
SELECT i.table_id, t.tabschema, t.tabname
FROM squeeze.tables_internal i,
pg_catalog.pg_stat_user_tables s,
squeeze.tables t,
pg_class c, pg_namespace n
WHERE
(t.tabschema, t.tabname) = (s.schemaname, s.relname) AND
i.table_id = t.id AND
n.nspname = t.tabschema AND c.relnamespace = n.oid AND
c.relname = t.tabname AND
(
((t.schedule).minutes ISNULL OR
EXTRACT(minute FROM now())::int = ANY((t.schedule).minutes))
AND
((t.schedule).hours ISNULL OR
EXTRACT(hour FROM now())::int = ANY((t.schedule).hours))
AND
((t.schedule).months ISNULL OR
EXTRACT(month FROM now())::int = ANY((t.schedule).months))
AND
(
-- At least one of the "days_of_month" and
-- "days_of_week" components must
-- match. However if one matches, NULL value
-- of the other must not be considered "any
-- day of month/week". Instead, NULL can only
-- cause a match if both components have it.
((t.schedule).days_of_month ISNULL AND
(t.schedule).days_of_week ISNULL)
OR
EXTRACT(day FROM now())::int = ANY((t.schedule).days_of_month)
OR
EXTRACT(dow FROM now())::int = ANY((t.schedule).days_of_week)
OR
-- Sunday can be expressed as both 0 and 7.
EXTRACT(isodow FROM now())::int = ANY((t.schedule).days_of_week)
)
);
CREATE FUNCTION check_schedule() RETURNS void
LANGUAGE sql
AS $$
-- Delete the processed tasks, but ignore those scheduled and
-- processed in the current minute - we don't want to schedule those
-- again now.
DELETE FROM squeeze.tasks t
WHERE state = 'processed' AND
(EXTRACT(HOUR FROM now()) <> EXTRACT(HOUR FROM t.created) OR
EXTRACT(MINUTE FROM now()) <> EXTRACT(MINUTE FROM t.created));
-- Create task where schedule does match.
INSERT INTO squeeze.tasks(table_id)
SELECT i.table_id
FROM squeeze.tables_internal i,
pg_catalog.pg_stat_user_tables s,
squeeze.tables t,
pg_class c, pg_namespace n
WHERE
(t.tabschema, t.tabname) = (s.schemaname, s.relname) AND
i.table_id = t.id AND
n.nspname = t.tabschema AND c.relnamespace = n.oid AND
c.relname = t.tabname
-- Is there a matching schedule?
AND EXISTS (
SELECT *
FROM squeeze.scheduled_for_now
WHERE table_id = i.table_id
)
-- Ignore tables for which a task currently exists.
AND NOT t.id IN (SELECT table_id FROM squeeze.tasks);
$$;
-- Update new tasks with the information on free space in the corresponding
-- tables.
CREATE OR REPLACE FUNCTION update_free_space_info() RETURNS void
LANGUAGE sql
AS $$
-- If VACUUM completed recenly enough, we consider the percentage of
-- dead tuples negligible and so retrieve the free space from FSM.
UPDATE squeeze.tasks k
SET free_space = 100 * squeeze.get_heap_freespace(c.oid)
FROM squeeze.tables t,
squeeze.tables_internal i,
pg_catalog.pg_class c,
pg_catalog.pg_namespace n,
pg_catalog.pg_stat_user_tables s
WHERE k.state = 'new' AND k.table_id = t.id AND i.table_id = t.id
AND t.tabname = c.relname AND c.relnamespace = n.oid AND
t.tabschema = n.nspname AND
(t.tabschema, t.tabname) = (s.schemaname, s.relname) AND
(
(s.last_vacuum >= now() - t.vacuum_max_age)
OR
(s.last_autovacuum >= now() - t.vacuum_max_age)
)
AND
-- Each processing makes the previous VACUUM unimportant.
(
i.last_task_finished ISNULL
OR
i.last_task_finished < s.last_vacuum
OR
i.last_task_finished < s.last_autovacuum
);
-- If VACUUM didn't run recently or there's no FSM, take the more
-- expensive approach.
UPDATE squeeze.tasks k
SET free_space = a.approx_free_percent + a.dead_tuple_percent
FROM squeeze.tables t,
pg_catalog.pg_class c,
pg_catalog.pg_namespace n,
squeeze.pgstattuple_approx(c.oid) a
WHERE k.state = 'new' AND k.free_space ISNULL AND
k.table_id = t.id AND t.tabname = c.relname AND
c.relnamespace = n.oid AND t.tabschema = n.nspname;
$$;
CREATE FUNCTION dispatch_new_tasks() RETURNS void
LANGUAGE sql
AS $$
-- First, get rid of tables not big enough for processing.
UPDATE squeeze.tasks k
SET state = 'processed'
FROM squeeze.tables t,
pg_catalog.pg_class c,
pg_catalog.pg_namespace n
WHERE k.state = 'new' AND k.table_id = t.id AND t.tabname = c.relname
AND c.relnamespace = n.oid AND t.tabschema = n.nspname AND
pg_catalog.pg_relation_size(c.oid, 'main') < t.min_size * 1048576;
SELECT squeeze.update_free_space_info();
-- Make the actual decision.
--
-- Ignore tasks having NULL in free_space - those have been created
-- after update_free_space_info() had finished, so the should waite
-- for the next run of dispatch_new_tasks().
UPDATE squeeze.tasks k
SET state =
CASE
WHEN k.free_space >
((100 - squeeze.get_heap_fillfactor(c.oid)) + t.free_space_extra)
THEN 'ready'
ELSE 'processed'
END
FROM squeeze.tables t,
pg_catalog.pg_class c,
pg_catalog.pg_namespace n
WHERE k.state = 'new' AND k.free_space NOTNULL AND k.table_id = t.id
AND t.tabname = c.relname AND c.relnamespace = n.oid AND
t.tabschema = n.nspname;
$$;
CREATE FUNCTION finalize_task(a_task_id int)
RETURNS void
LANGUAGE sql
AS $$
WITH updated(table_id) AS (
UPDATE squeeze.tasks t
SET state = 'processed'
WHERE id = a_task_id
RETURNING table_id
)
UPDATE squeeze.tables_internal t
SET last_task_finished = now()
FROM updated u
WHERE u.table_id = t.table_id;
$$;
CREATE FUNCTION cancel_task(a_task_id int)
RETURNS void
LANGUAGE sql
AS $$
UPDATE squeeze.tasks t
SET state = 'processed'
WHERE id = a_task_id;
$$;
CREATE FUNCTION process_next_task()
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
v_tabschema name;
v_tabname name;
v_cl_index name;
v_rel_tbsp name;
v_ind_tbsps name[][];
v_task_id int;
v_tried int;
v_last_try bool;
v_skip_analyze bool;
v_stmt text;
v_start timestamptz;
-- Error info to be logged.
v_sql_state text;
v_err_msg text;
v_err_detail text;
BEGIN
-- Retrieve the table corresponding to the least recently created task
-- in the 'ready' state.
SELECT tb.tabschema, tb.tabname, tb.clustering_index,
tb.rel_tablespace, tb.ind_tablespaces, t.id, t.tried,
t.tried >= tb.max_retry, tb.skip_analyze
INTO v_tabschema, v_tabname, v_cl_index, v_rel_tbsp, v_ind_tbsps,
v_task_id, v_tried, v_last_try, v_skip_analyze
FROM squeeze.tasks t, squeeze.tables tb
WHERE t.table_id = tb.id AND t.state = 'ready'
ORDER BY t.created
LIMIT 1;
IF NOT FOUND THEN
-- No task currently available?
RETURN;
END IF;
-- Do the actual work.
BEGIN
v_start := clock_timestamp();
-- Do the actual processing.
--
-- If someone dropped the table in between, the exception
-- handler below should log the error and cleanup the task.
PERFORM squeeze.squeeze_table(v_tabschema, v_tabname,
v_cl_index, v_rel_tbsp, v_ind_tbsps);
INSERT INTO squeeze.log(tabschema, tabname, started, finished)
VALUES (v_tabschema, v_tabname, v_start, clock_timestamp());
PERFORM squeeze.finalize_task(v_task_id);
IF NOT v_skip_analyze THEN
-- Analyze the new table, unless user rejects it
-- explicitly.
--
-- XXX Besides updating planner statistics in general,
-- this sets pg_class(relallvisible) to 0, so that
-- planner is not too optimistic about this
-- figure. The preferrable solution would be to run
-- (lazy) VACUUM (with the ANALYZE option) to
-- initialize visibility map. However, to make the
-- effort worthwile, we shouldn't do it until all
-- transactions can see all the changes done by
-- squeeze_table() function. What's the most suitable
-- way to wait? Asynchronous execution of the VACUUM
-- is probably needed in any case.
v_stmt := 'ANALYZE "' || v_tabschema || '"."' ||
v_tabname || '"';
EXECUTE v_stmt;
END IF;
EXCEPTION
WHEN OTHERS THEN
GET STACKED DIAGNOSTICS v_sql_state := RETURNED_SQLSTATE;
GET STACKED DIAGNOSTICS v_err_msg := MESSAGE_TEXT;
GET STACKED DIAGNOSTICS v_err_detail := PG_EXCEPTION_DETAIL;
INSERT INTO squeeze.errors(tabschema, tabname,
sql_state, err_msg, err_detail)
VALUES (v_tabschema, v_tabname, v_sql_state, v_err_msg,
v_err_detail);
-- If the active task failed too many times, cancel
-- it.
IF v_last_try THEN
PERFORM squeeze.cancel_task(v_task_id);
RETURN;
ELSE
-- Account for the current attempt.
UPDATE squeeze.tasks
SET tried = tried + 1
WHERE id = v_task_id;
END IF;
END;
END;
$$;
CREATE FUNCTION stop_worker()
RETURNS SETOF record
LANGUAGE sql
AS $$
-- When looking for the PID we rely on the fact that the worker holds
-- lock on the extension. If the worker is not running, we could (in
-- theory) kill a regular backend trying to ALTER or DROP the
-- extension right now. It's not worth taking a different approach
-- just to avoid this extremely unlikely case (which shouldn't cause
-- data corruption).
SELECT pid, pg_terminate_backend(pid)
FROM pg_catalog.pg_locks l,
pg_catalog.pg_extension e
WHERE e.extname = 'pg_squeeze' AND
(l.classid, l.objid) = (3079, e.oid);
$$;
|