1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
|
create or replace function pgq.failed_event_list(
x_queue_name text,
x_consumer_name text)
returns setof pgq.failed_queue as $$
-- ----------------------------------------------------------------------
-- Function: pgq.failed_event_list(2)
--
-- Get list of all failed events for one consumer.
--
-- Parameters:
-- x_queue_name - Queue name
-- x_consumer_name - Consumer name
--
-- Returns:
-- List of failed events.
-- ----------------------------------------------------------------------
declare
rec pgq.failed_queue%rowtype;
begin
for rec in
select fq.*
from pgq.failed_queue fq, pgq.consumer,
pgq.queue, pgq.subscription
where queue_name = x_queue_name
and co_name = x_consumer_name
and sub_consumer = co_id
and sub_queue = queue_id
and ev_owner = sub_id
order by ev_id
loop
return next rec;
end loop;
return;
end;
$$ language plpgsql security definer;
create or replace function pgq.failed_event_list(
x_queue_name text,
x_consumer_name text,
x_count integer,
x_offset integer)
returns setof pgq.failed_queue as $$
-- ----------------------------------------------------------------------
-- Function: pgq.failed_event_list(4)
--
-- Get list of failed events, from offset and specific count.
--
-- Parameters:
-- x_queue_name - Queue name
-- x_consumer_name - Consumer name
-- x_count - Max amount of events to fetch
-- x_offset - From this offset
--
-- Returns:
-- List of failed events.
-- ----------------------------------------------------------------------
declare
rec pgq.failed_queue%rowtype;
begin
for rec in
select fq.*
from pgq.failed_queue fq, pgq.consumer,
pgq.queue, pgq.subscription
where queue_name = x_queue_name
and co_name = x_consumer_name
and sub_consumer = co_id
and sub_queue = queue_id
and ev_owner = sub_id
order by ev_id
limit x_count
offset x_offset
loop
return next rec;
end loop;
return;
end;
$$ language plpgsql security definer;
create or replace function pgq.failed_event_count(
x_queue_name text,
x_consumer_name text)
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.failed_event_count(2)
--
-- Get size of failed event queue.
--
-- Parameters:
-- x_queue_name - Queue name
-- x_consumer_name - Consumer name
--
-- Returns:
-- Number of failed events in failed event queue.
-- ----------------------------------------------------------------------
declare
ret integer;
begin
select count(1) into ret
from pgq.failed_queue, pgq.consumer, pgq.queue, pgq.subscription
where queue_name = x_queue_name
and co_name = x_consumer_name
and sub_queue = queue_id
and sub_consumer = co_id
and ev_owner = sub_id;
return ret;
end;
$$ language plpgsql security definer;
create or replace function pgq.failed_event_delete(
x_queue_name text,
x_consumer_name text,
x_event_id bigint)
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: pgq.failed_event_delete(3)
--
-- Delete specific event from failed event queue.
--
-- Parameters:
-- x_queue_name - Queue name
-- x_consumer_name - Consumer name
-- x_event_id - Event ID
--
-- Returns:
-- nothing
-- ----------------------------------------------------------------------
declare
x_sub_id integer;
begin
select sub_id into x_sub_id
from pgq.subscription, pgq.consumer, pgq.queue
where queue_name = x_queue_name
and co_name = x_consumer_name
and sub_consumer = co_id
and sub_queue = queue_id;
if not found then
raise exception 'no such queue/consumer';
end if;
delete from pgq.failed_queue
where ev_owner = x_sub_id
and ev_id = x_event_id;
if not found then
raise exception 'event not found';
end if;
return 1;
end;
$$ language plpgsql security definer;
create or replace function pgq.failed_event_retry(
x_queue_name text,
x_consumer_name text,
x_event_id bigint)
returns bigint as $$
-- ----------------------------------------------------------------------
-- Function: pgq.failed_event_retry(3)
--
-- Insert specific event from failed queue to main queue.
--
-- Parameters:
-- x_queue_name - Queue name
-- x_consumer_name - Consumer name
-- x_event_id - Event ID
--
-- Returns:
-- nothing
-- ----------------------------------------------------------------------
declare
ret bigint;
x_sub_id integer;
begin
select sub_id into x_sub_id
from pgq.subscription, pgq.consumer, pgq.queue
where queue_name = x_queue_name
and co_name = x_consumer_name
and sub_consumer = co_id
and sub_queue = queue_id;
if not found then
raise exception 'no such queue/consumer';
end if;
select pgq.insert_event_raw(x_queue_name, ev_id, ev_time,
ev_owner, ev_retry, ev_type, ev_data,
ev_extra1, ev_extra2, ev_extra3, ev_extra4)
into ret
from pgq.failed_queue, pgq.consumer, pgq.queue
where ev_owner = x_sub_id
and ev_id = x_event_id;
if not found then
raise exception 'event not found';
end if;
perform pgq.failed_event_delete(x_queue_name, x_consumer_name, x_event_id);
return ret;
end;
$$ language plpgsql security definer;
|