1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
create or replace function pgq.register_consumer(
x_queue_name text,
x_consumer_id text)
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.register_consumer(2)
--
-- Subscribe consumer on a queue.
--
-- From this moment forward, consumer will see all events in the queue.
--
-- Parameters:
-- x_queue_name - Name of queue
-- x_consumer_name - Name of consumer
--
-- Returns:
-- 0 - if already registered
-- 1 - if new registration
-- Calls:
-- pgq.register_consumer_at(3)
-- Tables directly manipulated:
-- None
-- ----------------------------------------------------------------------
begin
return pgq.register_consumer_at(x_queue_name, x_consumer_id, NULL);
end;
$$ language plpgsql security definer;
create or replace function pgq.register_consumer_at(
x_queue_name text,
x_consumer_name text,
x_tick_pos bigint)
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.register_consumer_at(3)
--
-- Extended registration, allows to specify tick_id.
--
-- Note:
-- For usage in special situations.
--
-- Parameters:
-- x_queue_name - Name of a queue
-- x_consumer_name - Name of consumer
-- x_tick_pos - Tick ID
--
-- Returns:
-- 0/1 whether consumer has already registered.
-- Calls:
-- None
-- Tables directly manipulated:
-- update/insert - pgq.subscription
-- ----------------------------------------------------------------------
declare
tmp text;
last_tick bigint;
x_queue_id integer;
x_consumer_id integer;
queue integer;
sub record;
begin
select queue_id into x_queue_id from pgq.queue
where queue_name = x_queue_name;
if not found then
raise exception 'Event queue not created yet';
end if;
-- get consumer and create if new
select co_id into x_consumer_id from pgq.consumer
where co_name = x_consumer_name
for update;
if not found then
insert into pgq.consumer (co_name) values (x_consumer_name);
x_consumer_id := currval('pgq.consumer_co_id_seq');
end if;
-- if particular tick was requested, check if it exists
if x_tick_pos is not null then
perform 1 from pgq.tick
where tick_queue = x_queue_id
and tick_id = x_tick_pos;
if not found then
raise exception 'cannot reposition, tick not found: %', x_tick_pos;
end if;
end if;
-- check if already registered
select sub_last_tick, sub_batch into sub
from pgq.subscription
where sub_consumer = x_consumer_id
and sub_queue = x_queue_id;
if found then
if x_tick_pos is not null then
-- if requested, update tick pos and drop partial batch
update pgq.subscription
set sub_last_tick = x_tick_pos,
sub_batch = null,
sub_next_tick = null,
sub_active = now()
where sub_consumer = x_consumer_id
and sub_queue = x_queue_id;
end if;
-- already registered
return 0;
end if;
-- new registration
if x_tick_pos is null then
-- start from current tick
select tick_id into last_tick from pgq.tick
where tick_queue = x_queue_id
order by tick_queue desc, tick_id desc
limit 1;
if not found then
raise exception 'No ticks for this queue. Please run ticker on database.';
end if;
else
last_tick := x_tick_pos;
end if;
-- register
insert into pgq.subscription (sub_queue, sub_consumer, sub_last_tick)
values (x_queue_id, x_consumer_id, last_tick);
return 1;
end;
$$ language plpgsql security definer;
|