File: pgq_node.register_consumer.sql

package info (click to toggle)
pgq-node 3.5-8
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 364 kB
  • sloc: sql: 1,412; python: 309; makefile: 14; sh: 1
file content (64 lines) | stat: -rw-r--r-- 2,095 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

create or replace function pgq_node.register_consumer(
    in i_queue_name text,
    in i_consumer_name text,
    in i_provider_node text,
    in i_custom_tick_id int8,
    out ret_code int4,
    out ret_note text)
returns record as $$
-- ----------------------------------------------------------------------
-- Function: pgq_node.register_consumer(4)
--
--      Subscribe plain cascaded consumer to a target node.
--      That means it's planning to read from remote node
--      and write to local node.
--
-- Parameters:
--      i_queue_name - set name
--      i_consumer_name - cascaded consumer name
--      i_provider_node - node name
--      i_custom_tick_id - tick id
--
-- Returns:
--      ret_code - error code
--      200 - ok
--      201 - already registered
--      401 - no such queue
--      ret_note - description
-- ----------------------------------------------------------------------
declare
    n record;
    node_wm_name text;
    node_pos bigint;
begin
    select node_type into n
      from pgq_node.node_info where queue_name = i_queue_name
       for update;
    if not found then
        select 404, 'Unknown queue: ' || i_queue_name into ret_code, ret_note;
        return;
    end if;
    perform 1 from pgq_node.local_state
      where queue_name = i_queue_name
        and consumer_name = i_consumer_name;
    if found then
        update pgq_node.local_state
           set provider_node = i_provider_node,
               last_tick_id = i_custom_tick_id
         where queue_name = i_queue_name
           and consumer_name = i_consumer_name;
        select 201, 'Consumer already registered: ' || i_queue_name
               || '/' || i_consumer_name  into ret_code, ret_note;
        return;
    end if;

    insert into pgq_node.local_state (queue_name, consumer_name, provider_node, last_tick_id)
           values (i_queue_name, i_consumer_name, i_provider_node, i_custom_tick_id);

    select 200, 'Consumer '||i_consumer_name||' registered on queue '||i_queue_name
        into ret_code, ret_note;
    return;
end;
$$ language plpgsql security definer;