1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
|
begin;
create or replace function pgq.maint_retry_events()
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.maint_retry_events(0)
--
-- Moves retry events back to main queue.
--
-- It moves small amount at a time. It should be called
-- until it returns 0
--
-- Returns:
-- Number of events processed.
-- ----------------------------------------------------------------------
declare
cnt integer;
rec record;
begin
cnt := 0;
for rec in
select queue_name,
ev_id, ev_time, ev_owner, ev_retry, ev_type, ev_data,
ev_extra1, ev_extra2, ev_extra3, ev_extra4
from pgq.retry_queue, pgq.queue, pgq.subscription
where ev_retry_after <= current_timestamp
and sub_id = ev_owner
and queue_id = sub_queue
order by ev_retry_after
limit 10
loop
cnt := cnt + 1;
perform pgq.insert_event_raw(rec.queue_name,
rec.ev_id, rec.ev_time, rec.ev_owner, rec.ev_retry,
rec.ev_type, rec.ev_data, rec.ev_extra1, rec.ev_extra2,
rec.ev_extra3, rec.ev_extra4);
delete from pgq.retry_queue
where ev_owner = rec.ev_owner
and ev_id = rec.ev_id;
end loop;
return cnt;
end;
$$ language plpgsql; -- need admin access
create or replace function pgq.version()
returns text as $$
-- ----------------------------------------------------------------------
-- Function: pgq.version(0)
--
-- Returns verison string for pgq. ATM its SkyTools version
-- that is only bumped when PGQ database code changes.
-- ----------------------------------------------------------------------
begin
return '2.1.7';
end;
$$ language plpgsql;
end;
|