File: pgq_node.set_subscriber_watermark.sql

package info (click to toggle)
pgq-node 3.5-8
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 364 kB
  • sloc: sql: 1,412; python: 309; makefile: 14; sh: 1
file content (56 lines) | stat: -rw-r--r-- 1,647 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

create or replace function pgq_node.set_subscriber_watermark(
    in i_queue_name text,
    in i_node_name text,
    in i_watermark bigint,
    out ret_code int4,
    out ret_note text)
returns record as $$
-- ----------------------------------------------------------------------
-- Function: pgq_node.set_subscriber_watermark(3)
--
--      Notify provider about subscribers lowest watermark.
--
--      Called on provider at interval by each worker  
--
-- Parameters:
--      i_queue_name - cascaded queue name
--      i_node_name - subscriber node name
--      i_watermark - tick_id
--
-- Returns:
--      ret_code    - error code
--      ret_note    - description
-- ----------------------------------------------------------------------
declare
    n       record;
    wm_name text;
begin
    wm_name := '.' || i_node_name || '.watermark';
    select * into n from pgq.get_consumer_info(i_queue_name, wm_name);
    if not found then
        select 404, 'node '||i_node_name||' not subscribed to queue ', i_queue_name
            into ret_code, ret_note;
        return;
    end if;

    -- todo: check if wm sane?
    if i_watermark < n.last_tick then
        select 405, 'watermark must not be moved backwards'
            into ret_code, ret_note;
        return;
    elsif i_watermark = n.last_tick then
        select 100, 'watermark already set'
            into ret_code, ret_note;
        return;
    end if;

    perform pgq.register_consumer_at(i_queue_name, wm_name, i_watermark);

    select 200, wm_name || ' set to ' || i_watermark::text
        into ret_code, ret_note;
    return;
end;
$$ language plpgsql security definer;