File: pgq.failed_queue.sql

package info (click to toggle)
skytools 2.1.8-2.2
  • links: PTS, VCS
  • area: main
  • in suites: squeeze
  • size: 1,980 kB
  • ctags: 1,543
  • sloc: sql: 6,635; python: 6,237; ansic: 2,799; makefile: 308; sh: 268
file content (201 lines) | stat: -rw-r--r-- 5,759 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201

create or replace function pgq.failed_event_list(
    x_queue_name text,
    x_consumer_name text)
returns setof pgq.failed_queue as $$ 
-- ----------------------------------------------------------------------
-- Function: pgq.failed_event_list(2)
--
--      Get list of all failed events for one consumer.
--
-- Parameters:
--      x_queue_name        - Queue name
--      x_consumer_name     - Consumer name
--
-- Returns:
--      List of failed events.
-- ----------------------------------------------------------------------
declare 
    rec pgq.failed_queue%rowtype; 
begin 
    for rec in
        select fq.*
          from pgq.failed_queue fq, pgq.consumer,
               pgq.queue, pgq.subscription
         where queue_name = x_queue_name
           and co_name = x_consumer_name
           and sub_consumer = co_id
           and sub_queue = queue_id
           and ev_owner = sub_id
        order by ev_id
    loop
        return next rec; 
    end loop; 
    return;
end; 
$$ language plpgsql security definer;

create or replace function pgq.failed_event_list(
    x_queue_name text,
    x_consumer_name text,
    x_count integer,
    x_offset integer)
returns setof pgq.failed_queue as $$ 
-- ----------------------------------------------------------------------
-- Function: pgq.failed_event_list(4)
--
--      Get list of failed events, from offset and specific count.
--
-- Parameters:
--      x_queue_name        - Queue name
--      x_consumer_name     - Consumer name
--      x_count             - Max amount of events to fetch
--      x_offset            - From this offset
--
-- Returns:
--      List of failed events.
-- ----------------------------------------------------------------------
declare 
    rec pgq.failed_queue%rowtype; 
begin 
    for rec in
        select fq.*
          from pgq.failed_queue fq, pgq.consumer,
               pgq.queue, pgq.subscription
         where queue_name = x_queue_name
           and co_name = x_consumer_name
           and sub_consumer = co_id
           and sub_queue = queue_id
           and ev_owner = sub_id
        order by ev_id
        limit x_count
        offset x_offset
    loop
        return next rec; 
    end loop; 
    return;
end; 
$$ language plpgsql security definer;

create or replace function pgq.failed_event_count(
    x_queue_name text,
    x_consumer_name text)
returns integer as $$ 
-- ----------------------------------------------------------------------
-- Function: pgq.failed_event_count(2)
--
--      Get size of failed event queue.
--
-- Parameters:
--      x_queue_name        - Queue name
--      x_consumer_name     - Consumer name
--
-- Returns:
--      Number of failed events in failed event queue.
-- ----------------------------------------------------------------------
declare 
    ret integer;
begin 
    select count(1) into ret
      from pgq.failed_queue, pgq.consumer, pgq.queue, pgq.subscription
     where queue_name = x_queue_name
       and co_name = x_consumer_name
       and sub_queue = queue_id
       and sub_consumer = co_id
       and ev_owner = sub_id;
    return ret;
end; 
$$ language plpgsql security definer;

create or replace function pgq.failed_event_delete(
    x_queue_name text,
    x_consumer_name text,
    x_event_id bigint)
returns integer as $$ 
-- ----------------------------------------------------------------------
-- Function: pgq.failed_event_delete(3)
--
--      Delete specific event from failed event queue.
--
-- Parameters:
--      x_queue_name        - Queue name
--      x_consumer_name     - Consumer name
--      x_event_id          - Event ID
--
-- Returns:
--      nothing
-- ----------------------------------------------------------------------
declare 
    x_sub_id integer;
begin 
    select sub_id into x_sub_id
      from pgq.subscription, pgq.consumer, pgq.queue
     where queue_name = x_queue_name
       and co_name = x_consumer_name
       and sub_consumer = co_id
       and sub_queue = queue_id;
    if not found then
        raise exception 'no such queue/consumer';
    end if;

    delete from pgq.failed_queue
     where ev_owner = x_sub_id
       and ev_id = x_event_id;
    if not found then
        raise exception 'event not found';
    end if;

    return 1;
end; 
$$ language plpgsql security definer;

create or replace function pgq.failed_event_retry(
    x_queue_name text,
    x_consumer_name text,
    x_event_id bigint)
returns bigint as $$ 
-- ----------------------------------------------------------------------
-- Function: pgq.failed_event_retry(3)
--
--      Insert specific event from failed queue to main queue.
--
-- Parameters:
--      x_queue_name        - Queue name
--      x_consumer_name     - Consumer name
--      x_event_id          - Event ID
--
-- Returns:
--      nothing
-- ----------------------------------------------------------------------
declare 
    ret         bigint;
    x_sub_id    integer;
begin 
    select sub_id into x_sub_id
      from pgq.subscription, pgq.consumer, pgq.queue
     where queue_name = x_queue_name
       and co_name = x_consumer_name
       and sub_consumer = co_id
       and sub_queue = queue_id;
    if not found then
        raise exception 'no such queue/consumer';
    end if;

    select pgq.insert_event_raw(x_queue_name, ev_id, ev_time,
            ev_owner, ev_retry, ev_type, ev_data,
            ev_extra1, ev_extra2, ev_extra3, ev_extra4)
      into ret
      from pgq.failed_queue, pgq.consumer, pgq.queue
     where ev_owner = x_sub_id
       and ev_id = x_event_id;
    if not found then
        raise exception 'event not found';
    end if;

    perform pgq.failed_event_delete(x_queue_name, x_consumer_name, x_event_id);

    return ret;
end; 
$$ language plpgsql security definer;