File: pgq_node.register_subscriber.sql

package info (click to toggle)
pgq-node 3.5-8
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 364 kB
  • sloc: sql: 1,412; python: 309; makefile: 14; sh: 1
file content (63 lines) | stat: -rw-r--r-- 2,138 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

create or replace function pgq_node.register_subscriber(
    in i_queue_name text,
    in i_remote_node_name text,
    in i_remote_worker_name text,
    in i_custom_tick_id int8,
    out ret_code int4,
    out ret_note text,
    out global_watermark bigint)
returns record as $$
-- ----------------------------------------------------------------------
-- Function: pgq_node.register_subscriber(4)
--
--      Subscribe remote node to local node at custom position.
--      Should be used when changing provider for existing node.
--
-- Parameters:
--      i_node_name - set name
--      i_remote_node_name - node name
--      i_remote_worker_name - consumer name
--      i_custom_tick_id - tick id [optional]
--
-- Returns:
--      ret_code - error code
--      ret_note - description
--      global_watermark - minimal watermark
-- ----------------------------------------------------------------------
declare
    n record;
    node_wm_name text;
    node_pos bigint;
begin
    select node_type into n
      from pgq_node.node_info where queue_name = i_queue_name
       for update;
    if not found then
        select 404, 'Unknown queue: ' || i_queue_name into ret_code, ret_note;
        return;
    end if;
    select last_tick into global_watermark
      from pgq.get_consumer_info(i_queue_name, '.global_watermark');

    if n.node_type not in ('root', 'branch') then
        select 401, 'Cannot subscribe to ' || n.node_type || ' node'
          into ret_code, ret_note;
        return;
    end if;

    node_wm_name := '.' || i_remote_node_name || '.watermark';
    node_pos := coalesce(i_custom_tick_id, global_watermark);

    perform pgq.register_consumer_at(i_queue_name, node_wm_name, global_watermark);

    perform pgq.register_consumer_at(i_queue_name, i_remote_worker_name, node_pos);

    insert into pgq_node.subscriber_info (queue_name, subscriber_node, worker_name, watermark_name)
        values (i_queue_name, i_remote_node_name, i_remote_worker_name, node_wm_name);

    select 200, 'Subscriber registered: '||i_remote_node_name into ret_code, ret_note;
    return;
end;
$$ language plpgsql security definer;