File: pgq_node.set_global_watermark.sql

package info (click to toggle)
pgq-node 3.5-8
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 364 kB
  • sloc: sql: 1,412; python: 309; makefile: 14; sh: 1
file content (94 lines) | stat: -rw-r--r-- 3,117 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94

create or replace function pgq_node.set_global_watermark(
    in i_queue_name text,
    in i_watermark bigint,
    out ret_code int4,
    out ret_note text)
returns record as $$
-- ----------------------------------------------------------------------
-- Function: pgq_node.set_global_watermark(2)
--
--      Move global watermark on branch/leaf, publish on root.
--
-- Parameters:
--      i_queue_name    - queue name
--      i_watermark     - global tick_id that is processed everywhere.
--                        NULL on root, then local wm is published.
-- ----------------------------------------------------------------------
declare
    this        record;
    _wm         bigint;
    wm_consumer text;
begin
    wm_consumer = '.global_watermark';

    select node_type, queue_name, worker_name into this
        from pgq_node.node_info
        where queue_name = i_queue_name
        for update;
    if not found then
        select 404, 'Queue' || i_queue_name || ' not found'
          into ret_code, ret_note;
        return;
    end if;

    _wm = i_watermark;
    if this.node_type = 'root' then
        if i_watermark is null then
            select f.ret_code, f.ret_note, f.local_watermark
                into ret_code, ret_note, _wm
                from pgq_node.get_node_info(i_queue_name) f;
            if ret_code >= 300 then
                return;
            end if;
            if _wm is null then
                raise exception 'local_watermark=NULL from get_node_info()?';
            end if;
        end if;

        -- move watermark
        perform pgq.register_consumer_at(i_queue_name, wm_consumer, _wm);

        -- send event downstream
        perform pgq.insert_event(i_queue_name, 'pgq.global-watermark', _wm::text,
                                 i_queue_name, null, null, null);
        -- update root workers pos to avoid it getting stale
        update pgq_node.local_state
            set last_tick_id = _wm
            where queue_name = i_queue_name
                and consumer_name = this.worker_name;
    elsif this.node_type = 'branch' then
        if i_watermark is null then
            select 500, 'bad usage: wm=null on branch node'
                into ret_code, ret_note;
            return;
        end if;

        -- tick can be missing if we are processing
        -- old batches that set watermark outside
        -- current range
        perform 1 from pgq.tick t, pgq.queue q
          where q.queue_name = i_queue_name
            and t.tick_queue = q.queue_id
            and t.tick_id = _wm;
        if not found then
            select 200, 'Skipping global watermark update to ' || _wm::text
                into ret_code, ret_note;
            return;
        end if;

        -- move watermark
        perform pgq.register_consumer_at(i_queue_name, wm_consumer, _wm);
    else
        select 100, 'Ignoring global watermark in leaf'
            into ret_code, ret_note;
        return;
    end if;

    select 200, 'Global watermark set to ' || _wm::text
        into ret_code, ret_note;
    return;
end;
$$ language plpgsql security definer;