1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
begin;
create or replace function pgq.maint_rotate_tables_step1(i_queue_name text)
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.maint_rotate_tables_step1(1)
--
-- Rotate tables for one queue.
--
-- Parameters:
-- i_queue_name - Name of the queue
--
-- Returns:
-- 1 if rotation happened, otherwise 0.
-- ----------------------------------------------------------------------
declare
badcnt integer;
cf record;
nr integer;
tbl text;
lowest_tick_id int8;
lowest_xmin int8;
begin
-- check if needed and load record
select * from pgq.queue into cf
where queue_name = i_queue_name
and queue_rotation_period is not null
and queue_switch_step2 is not null
and queue_switch_time + queue_rotation_period < current_timestamp
for update;
if not found then
return 0;
end if;
-- find lowest tick for that queue
select min(sub_last_tick) into lowest_tick_id
from pgq.subscription
where sub_queue = cf.queue_id;
-- if some consumer exists
if lowest_tick_id is not null then
-- is the slowest one still on previous table?
select txid_snapshot_xmin(tick_snapshot) into lowest_xmin
from pgq.tick
where tick_queue = cf.queue_id
and tick_id = lowest_tick_id;
if lowest_xmin <= cf.queue_switch_step2 then
return 0; -- skip rotation then
end if;
end if;
-- nobody on previous table, we can rotate
-- calc next table number and name
nr := cf.queue_cur_table + 1;
if nr = cf.queue_ntables then
nr := 0;
end if;
tbl := cf.queue_data_pfx || '_' || nr;
-- there may be long lock on the table from pg_dump,
-- detect it and skip rotate then
begin
execute 'lock table ' || tbl || ' nowait';
execute 'truncate ' || tbl;
exception
when lock_not_available then
-- cannot truncate, skipping rotate
return 0;
end;
-- remember the moment
update pgq.queue
set queue_cur_table = nr,
queue_switch_time = current_timestamp,
queue_switch_step1 = txid_current(),
queue_switch_step2 = NULL
where queue_id = cf.queue_id;
-- Clean ticks by using step2 txid from previous rotation.
-- That should keep all ticks for all batches that are completely
-- in old table. This keeps them for longer than needed, but:
-- 1. we want the pgq.tick table to be big, to avoid Postgres
-- accitentally switching to seqscans on that.
-- 2. that way we guarantee to consumers that they an be moved
-- back on the queue at least for one rotation_period.
-- (may help in disaster recovery)
delete from pgq.tick
where tick_queue = cf.queue_id
and txid_snapshot_xmin(tick_snapshot) < cf.queue_switch_step2;
return 1;
end;
$$ language plpgsql; -- need admin access
create or replace function pgq.maint_rotate_tables_step2()
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.maint_rotate_tables_step2(0)
--
-- Stores the txid when the rotation was visible. It should be
-- called in separate transaction than pgq.maint_rotate_tables_step1()
-- ----------------------------------------------------------------------
begin
update pgq.queue
set queue_switch_step2 = txid_current()
where queue_switch_step2 is null;
return 1;
end;
$$ language plpgsql; -- need admin access
create or replace function pgq.version()
returns text as $$
-- ----------------------------------------------------------------------
-- Function: pgq.version(0)
--
-- Returns verison string for pgq. ATM its SkyTools version
-- that is only bumped when PGQ database code changes.
-- ----------------------------------------------------------------------
begin
return '2.1.8';
end;
$$ language plpgsql;
end;
|