1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
|
create or replace function pgq.next_batch_info(
in i_queue_name text,
in i_consumer_name text,
out batch_id int8,
out cur_tick_id int8,
out prev_tick_id int8,
out cur_tick_time timestamptz,
out prev_tick_time timestamptz,
out cur_tick_event_seq int8,
out prev_tick_event_seq int8)
as $$
-- ----------------------------------------------------------------------
-- Function: pgq.next_batch_info(2)
--
-- Makes next block of events active.
--
-- If it returns NULL, there is no events available in queue.
-- Consumer should sleep then.
--
-- The values from event_id sequence may give hint how big the
-- batch may be. But they are inexact, they do not give exact size.
-- Client *MUST NOT* use them to detect whether the batch contains any
-- events at all - the values are unfit for that purpose.
--
-- Parameters:
-- i_queue_name - Name of the queue
-- i_consumer_name - Name of the consumer
--
-- Returns:
-- batch_id - Batch ID or NULL if there are no more events available.
-- cur_tick_id - End tick id.
-- cur_tick_time - End tick time.
-- cur_tick_event_seq - Value from event id sequence at the time tick was issued.
-- prev_tick_id - Start tick id.
-- prev_tick_time - Start tick time.
-- prev_tick_event_seq - value from event id sequence at the time tick was issued.
-- Calls:
-- pgq.next_batch_custom(5)
-- Tables directly manipulated:
-- None
-- ----------------------------------------------------------------------
begin
select f.batch_id, f.cur_tick_id, f.prev_tick_id,
f.cur_tick_time, f.prev_tick_time,
f.cur_tick_event_seq, f.prev_tick_event_seq
into batch_id, cur_tick_id, prev_tick_id, cur_tick_time, prev_tick_time,
cur_tick_event_seq, prev_tick_event_seq
from pgq.next_batch_custom(i_queue_name, i_consumer_name, NULL, NULL, NULL) f;
return;
end;
$$ language plpgsql;
create or replace function pgq.next_batch(
in i_queue_name text,
in i_consumer_name text)
returns int8 as $$
-- ----------------------------------------------------------------------
-- Function: pgq.next_batch(2)
--
-- Old function that returns just batch_id.
--
-- Parameters:
-- i_queue_name - Name of the queue
-- i_consumer_name - Name of the consumer
--
-- Returns:
-- Batch ID or NULL if there are no more events available.
-- ----------------------------------------------------------------------
declare
res int8;
begin
select batch_id into res
from pgq.next_batch_info(i_queue_name, i_consumer_name);
return res;
end;
$$ language plpgsql;
create or replace function pgq.next_batch_custom(
in i_queue_name text,
in i_consumer_name text,
in i_min_lag interval,
in i_min_count int4,
in i_min_interval interval,
out batch_id int8,
out cur_tick_id int8,
out prev_tick_id int8,
out cur_tick_time timestamptz,
out prev_tick_time timestamptz,
out cur_tick_event_seq int8,
out prev_tick_event_seq int8)
as $$
-- ----------------------------------------------------------------------
-- Function: pgq.next_batch_custom(5)
--
-- Makes next block of events active. Block size can be tuned
-- with i_min_count, i_min_interval parameters. Events age can
-- be tuned with i_min_lag.
--
-- If it returns NULL, there is no events available in queue.
-- Consumer should sleep then.
--
-- The values from event_id sequence may give hint how big the
-- batch may be. But they are inexact, they do not give exact size.
-- Client *MUST NOT* use them to detect whether the batch contains any
-- events at all - the values are unfit for that purpose.
--
-- Note:
-- i_min_lag together with i_min_interval/i_min_count is inefficient.
--
-- Parameters:
-- i_queue_name - Name of the queue
-- i_consumer_name - Name of the consumer
-- i_min_lag - Consumer wants events older than that
-- i_min_count - Consumer wants batch to contain at least this many events
-- i_min_interval - Consumer wants batch to cover at least this much time
--
-- Returns:
-- batch_id - Batch ID or NULL if there are no more events available.
-- cur_tick_id - End tick id.
-- cur_tick_time - End tick time.
-- cur_tick_event_seq - Value from event id sequence at the time tick was issued.
-- prev_tick_id - Start tick id.
-- prev_tick_time - Start tick time.
-- prev_tick_event_seq - value from event id sequence at the time tick was issued.
-- Calls:
-- pgq.insert_event_raw(11)
-- Tables directly manipulated:
-- update - pgq.subscription
-- ----------------------------------------------------------------------
declare
errmsg text;
queue_id integer;
sub_id integer;
cons_id integer;
begin
select s.sub_queue, s.sub_consumer, s.sub_id, s.sub_batch,
t1.tick_id, t1.tick_time, t1.tick_event_seq,
t2.tick_id, t2.tick_time, t2.tick_event_seq
into queue_id, cons_id, sub_id, batch_id,
prev_tick_id, prev_tick_time, prev_tick_event_seq,
cur_tick_id, cur_tick_time, cur_tick_event_seq
from pgq.consumer c,
pgq.queue q,
pgq.subscription s
left join pgq.tick t1
on (t1.tick_queue = s.sub_queue
and t1.tick_id = s.sub_last_tick)
left join pgq.tick t2
on (t2.tick_queue = s.sub_queue
and t2.tick_id = s.sub_next_tick)
where q.queue_name = i_queue_name
and c.co_name = i_consumer_name
and s.sub_queue = q.queue_id
and s.sub_consumer = c.co_id;
if not found then
errmsg := 'Not subscriber to queue: '
|| coalesce(i_queue_name, 'NULL')
|| '/'
|| coalesce(i_consumer_name, 'NULL');
raise exception '%', errmsg;
end if;
-- sanity check
if prev_tick_id is null then
raise exception 'PgQ corruption: Consumer % on queue % does not see tick %', i_consumer_name, i_queue_name, prev_tick_id;
end if;
-- has already active batch
if batch_id is not null then
return;
end if;
if i_min_interval is null and i_min_count is null then
-- find next tick
select tick_id, tick_time, tick_event_seq
into cur_tick_id, cur_tick_time, cur_tick_event_seq
from pgq.tick
where tick_id > prev_tick_id
and tick_queue = queue_id
order by tick_queue asc, tick_id asc
limit 1;
else
-- find custom tick
select next_tick_id, next_tick_time, next_tick_seq
into cur_tick_id, cur_tick_time, cur_tick_event_seq
from pgq.find_tick_helper(queue_id, prev_tick_id,
prev_tick_time, prev_tick_event_seq,
i_min_count, i_min_interval);
end if;
if i_min_lag is not null then
-- enforce min lag
if now() - cur_tick_time < i_min_lag then
cur_tick_id := NULL;
cur_tick_time := NULL;
cur_tick_event_seq := NULL;
end if;
end if;
if cur_tick_id is null then
-- nothing to do
prev_tick_id := null;
prev_tick_time := null;
prev_tick_event_seq := null;
return;
end if;
-- get next batch
batch_id := nextval('pgq.batch_id_seq');
update pgq.subscription
set sub_batch = batch_id,
sub_next_tick = cur_tick_id,
sub_active = now()
where sub_queue = queue_id
and sub_consumer = cons_id;
return;
end;
$$ language plpgsql security definer;
|