File: pgq_node.get_subscriber_info.sql

package info (click to toggle)
pgq-node 3.5-8
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 364 kB
  • sloc: sql: 1,412; python: 309; makefile: 14; sh: 1
file content (43 lines) | stat: -rw-r--r-- 1,395 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

create or replace function pgq_node.get_subscriber_info(
    in i_queue_name text,

    out node_name text,
    out worker_name text,
    out node_watermark int8)
returns setof record as $$
-- ----------------------------------------------------------------------
-- Function: pgq_node.get_subscriber_info(1)
--
--      Get subscriber list for the local node.
--
--      It may be out-of-date, due to in-progress
--      administrative change.
--      Node's local provider info ( pgq_node.get_node_info() or pgq_node.get_worker_state(1) )
--      is the authoritative source.
--
-- Parameters:
--      i_queue_name  - cascaded queue name
--
-- Returns:
--      node_name       - node name that uses current node as provider
--      worker_name     - consumer that maintains remote node
--      local_watermark - lowest tick_id on subscriber
-- ----------------------------------------------------------------------
declare
    _watermark_name text;
begin
    for node_name, worker_name, _watermark_name in
        select s.subscriber_node, s.worker_name, s.watermark_name
          from pgq_node.subscriber_info s
         where s.queue_name = i_queue_name
         order by 1
    loop
        select last_tick into node_watermark
            from pgq.get_consumer_info(i_queue_name, _watermark_name);
        return next;
    end loop;
    return;
end;
$$ language plpgsql security definer;