1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
|
create or replace function pgq.ticker(i_queue_name text, i_tick_id bigint, i_orig_timestamp timestamptz, i_event_seq bigint)
returns bigint as $$
-- ----------------------------------------------------------------------
-- Function: pgq.ticker(3)
--
-- External ticker: Insert a tick with a particular tick_id and timestamp.
--
-- Parameters:
-- i_queue_name - Name of the queue
-- i_tick_id - Id of new tick.
--
-- Returns:
-- Tick id.
-- ----------------------------------------------------------------------
begin
insert into pgq.tick (tick_queue, tick_id, tick_time, tick_event_seq)
select queue_id, i_tick_id, i_orig_timestamp, i_event_seq
from pgq.queue
where queue_name = i_queue_name
and queue_external_ticker
and not queue_ticker_paused;
if not found then
raise exception 'queue not found or ticker disabled: %', i_queue_name;
end if;
-- make sure seqs stay current
perform pgq.seq_setval(queue_tick_seq, i_tick_id),
pgq.seq_setval(queue_event_seq, i_event_seq)
from pgq.queue
where queue_name = i_queue_name;
return i_tick_id;
end;
$$ language plpgsql security definer; -- unsure about access
create or replace function pgq.ticker(i_queue_name text)
returns bigint as $$
-- ----------------------------------------------------------------------
-- Function: pgq.ticker(1)
--
-- Check if tick is needed for the queue and insert it.
--
-- For pgqadm usage.
--
-- Parameters:
-- i_queue_name - Name of the queue
--
-- Returns:
-- Tick id or NULL if no tick was done.
-- ----------------------------------------------------------------------
declare
res bigint;
q record;
state record;
last2 record;
begin
select queue_id, queue_tick_seq, queue_external_ticker,
queue_ticker_max_count, queue_ticker_max_lag,
queue_ticker_idle_period, queue_event_seq,
pgq.seq_getval(queue_event_seq) as event_seq,
queue_ticker_paused
into q
from pgq.queue where queue_name = i_queue_name;
if not found then
raise exception 'no such queue';
end if;
if q.queue_external_ticker then
raise exception 'This queue has external tick source.';
end if;
if q.queue_ticker_paused then
raise exception 'Ticker has been paused for this queue';
end if;
-- load state from last tick
select now() - tick_time as lag,
q.event_seq - tick_event_seq as new_events,
tick_id, tick_time, tick_event_seq,
txid_snapshot_xmax(tick_snapshot) as sxmax,
txid_snapshot_xmin(tick_snapshot) as sxmin
into state
from pgq.tick
where tick_queue = q.queue_id
order by tick_queue desc, tick_id desc
limit 1;
if found then
if state.sxmin > txid_current() then
raise exception 'Invalid PgQ state: old xmin=%, old xmax=%, cur txid=%',
state.sxmin, state.sxmax, txid_current();
end if;
if state.new_events < 0 then
raise warning 'Negative new_events? old=% cur=%', state.tick_event_seq, q.event_seq;
end if;
if state.sxmax > txid_current() then
raise warning 'Dubious PgQ state: old xmax=%, cur txid=%', state.sxmax, txid_current();
end if;
if state.new_events > 0 then
-- there are new events, should we wait a bit?
if state.new_events < q.queue_ticker_max_count
and state.lag < q.queue_ticker_max_lag
then
return NULL;
end if;
else
-- no new events, should we apply idle period?
-- check previous event from the last one.
select state.tick_time - tick_time as lag
into last2
from pgq.tick
where tick_queue = q.queue_id
and tick_id < state.tick_id
order by tick_queue desc, tick_id desc
limit 1;
if found then
-- gradually decrease the tick frequency
if (state.lag < q.queue_ticker_max_lag / 2)
or
(state.lag < last2.lag * 2
and state.lag < q.queue_ticker_idle_period)
then
return NULL;
end if;
end if;
end if;
end if;
insert into pgq.tick (tick_queue, tick_id, tick_event_seq)
values (q.queue_id, nextval(q.queue_tick_seq), q.event_seq);
return currval(q.queue_tick_seq);
end;
$$ language plpgsql security definer; -- unsure about access
create or replace function pgq.ticker() returns bigint as $$
-- ----------------------------------------------------------------------
-- Function: pgq.ticker(0)
--
-- Creates ticks for all unpaused queues which dont have external ticker.
--
-- Returns:
-- Number of queues that were processed.
-- ----------------------------------------------------------------------
declare
res bigint;
q record;
begin
res := 0;
for q in
select queue_name from pgq.queue
where not queue_external_ticker
and not queue_ticker_paused
order by queue_name
loop
if pgq.ticker(q.queue_name) > 0 then
res := res + 1;
end if;
end loop;
return res;
end;
$$ language plpgsql security definer;
|