1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
|
create or replace function pgq_node.register_consumer(
in i_queue_name text,
in i_consumer_name text,
in i_provider_node text,
in i_custom_tick_id int8,
out ret_code int4,
out ret_note text)
returns record as $$
-- ----------------------------------------------------------------------
-- Function: pgq_node.register_consumer(4)
--
-- Subscribe plain cascaded consumer to a target node.
-- That means it's planning to read from remote node
-- and write to local node.
--
-- Parameters:
-- i_queue_name - set name
-- i_consumer_name - cascaded consumer name
-- i_provider_node - node name
-- i_custom_tick_id - tick id
--
-- Returns:
-- ret_code - error code
-- 200 - ok
-- 201 - already registered
-- 401 - no such queue
-- ret_note - description
-- ----------------------------------------------------------------------
declare
n record;
node_wm_name text;
node_pos bigint;
begin
select node_type into n
from pgq_node.node_info where queue_name = i_queue_name
for update;
if not found then
select 404, 'Unknown queue: ' || i_queue_name into ret_code, ret_note;
return;
end if;
perform 1 from pgq_node.local_state
where queue_name = i_queue_name
and consumer_name = i_consumer_name;
if found then
update pgq_node.local_state
set provider_node = i_provider_node,
last_tick_id = i_custom_tick_id
where queue_name = i_queue_name
and consumer_name = i_consumer_name;
select 201, 'Consumer already registered: ' || i_queue_name
|| '/' || i_consumer_name into ret_code, ret_note;
return;
end if;
insert into pgq_node.local_state (queue_name, consumer_name, provider_node, last_tick_id)
values (i_queue_name, i_consumer_name, i_provider_node, i_custom_tick_id);
select 200, 'Consumer '||i_consumer_name||' registered on queue '||i_queue_name
into ret_code, ret_note;
return;
end;
$$ language plpgsql security definer;
|