File: pgq_node.maint_watermark.sql

package info (click to toggle)
pgq-node 3.5-8
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 364 kB
  • sloc: sql: 1,412; python: 309; makefile: 14; sh: 1
file content (31 lines) | stat: -rw-r--r-- 845 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

create or replace function pgq_node.maint_watermark(i_queue_name text)
returns int4 as $$
-- ----------------------------------------------------------------------
-- Function: pgq_node.maint_watermark(1)
--
--      Move global watermark on root node.
--
-- Returns:
--      0 - tells pgqd to call just once
-- ----------------------------------------------------------------------
declare
    _lag interval;
begin
    perform 1 from pgq_node.node_info
      where queue_name = i_queue_name
        and node_type = 'root'
      for update;
    if not found then
        return 0;
    end if;

    select lag into _lag from pgq.get_consumer_info(i_queue_name, '.global_watermark');
    if _lag >= '5 minutes'::interval then
        perform pgq_node.set_global_watermark(i_queue_name, NULL);
    end if;

    return 0;
end;
$$ language plpgsql;