1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
|
create or replace function pgq_node.promote_branch(
in i_queue_name text,
out ret_code int4,
out ret_note text)
as $$
-- ----------------------------------------------------------------------
-- Function: pgq_node.promote_branch(1)
--
-- Promote branch node to root.
--
-- Parameters:
-- i_queue_name - queue name
--
-- Returns:
-- 200 - success
-- 404 - node not initialized for queue
-- 301 - node is not branch
-- ----------------------------------------------------------------------
declare
n_name text;
n_type text;
w_name text;
last_tick bigint;
sql text;
begin
select node_name, node_type, worker_name into n_name, n_type, w_name
from pgq_node.node_info
where queue_name = i_queue_name
for update;
if not found then
select 404, 'Node not initialized for queue: ' || i_queue_name
into ret_code, ret_note;
return;
end if;
if n_type != 'branch' then
select 301, 'Node not branch'
into ret_code, ret_note;
return;
end if;
update pgq.queue
set queue_disable_insert = false,
queue_external_ticker = false
where queue_name = i_queue_name;
-- change type, point worker to itself
select t.tick_id into last_tick
from pgq.tick t, pgq.queue q
where q.queue_name = i_queue_name
and t.tick_queue = q.queue_id
order by t.tick_queue desc, t.tick_id desc
limit 1;
-- make tick seq larger than last tick
perform pgq.seq_setval(queue_tick_seq, last_tick)
from pgq.queue where queue_name = i_queue_name;
update pgq_node.node_info
set node_type = 'root'
where queue_name = i_queue_name;
update pgq_node.local_state
set provider_node = n_name,
last_tick_id = last_tick,
uptodate = false
where queue_name = i_queue_name
and consumer_name = w_name;
select 200, 'Branch node promoted to root'
into ret_code, ret_note;
return;
end;
$$ language plpgsql security definer;
|