File: londiste.get_table_list.sql

package info (click to toggle)
londiste-sql 3.8-6
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 560 kB
  • sloc: sql: 2,742; python: 309; makefile: 18; sh: 1
file content (165 lines) | stat: -rw-r--r-- 6,439 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165

drop function if exists londiste.get_table_list(text);

create or replace function londiste.get_table_list(
    in i_queue_name text,
    out table_name text,
    out local boolean,
    out merge_state text,
    out custom_snapshot text,
    out table_attrs text,
    out dropped_ddl text,
    out copy_role text,
    out copy_pos int4,
    out dest_table text)
returns setof record as $$
-- ----------------------------------------------------------------------
-- Function: londiste.get_table_list(1)
--
--      Return info about registered tables.
--
-- Parameters:
--      i_queue_name - cascaded queue name
--
-- Returns:
--      table_name      - fully-quelified table name
--      local           - does events needs to be applied to local table
--      merge_state     - show phase of initial copy
--      custom_snapshot - remote snapshot of COPY transaction
--      table_attrs     - urlencoded dict of table attributes
--      dropped_ddl     - partition combining: temp place to put DDL
--      copy_role       - partition combining: how to handle copy
--      copy_pos        - position in parallel copy working order
--
-- copy_role = lead:
--      on copy start, drop indexes and store in dropped_ddl
--      on copy finish change state to catching-up, then wait until copy_role turns to NULL
--      catching-up: if dropped_ddl not NULL, restore them
-- copy_role = wait-copy:
--      on copy start wait, until role changes (to wait-replay)
-- copy_role = wait-replay:
--      on copy finish, tag as 'catching-up'
--      wait until copy_role is NULL, then proceed
-- ----------------------------------------------------------------------
begin
    for table_name, local, merge_state, custom_snapshot, table_attrs,
        dropped_ddl, dest_table
    in
        select t.table_name, t.local, t.merge_state, t.custom_snapshot, t.table_attrs,
               t.dropped_ddl, t.dest_table
            from londiste.table_info t
            where t.queue_name = i_queue_name
            order by t.nr, t.table_name
    loop
        copy_role := null;
        copy_pos := 0;

        if merge_state in ('in-copy', 'catching-up') then
            select f.copy_role, f.copy_pos
                from londiste._coordinate_copy(i_queue_name, table_name) f
                into copy_role, copy_pos;
        end if;

        return next;
    end loop;
    return;
end;
$$ language plpgsql strict stable;


create or replace function londiste._coordinate_copy(
    in i_queue_name text, in i_table_name text,
    out copy_role text, out copy_pos int4)
as $$
-- if the table is in middle of copy from multiple partitions,
-- the copy processes need coordination.
declare
    q_part1     text;
    q_part_ddl  text;
    n_parts     int4;
    n_done      int4;
    _table_name text;
    n_combined_queue text;
    merge_state text;
    dest_table  text;
    dropped_ddl text;
begin
    copy_pos := 0;
    copy_role := null;

    select t.merge_state, t.dest_table, t.dropped_ddl,
           min(case when t2.local then t2.queue_name else null end) as _queue1,
           min(case when t2.local and t2.dropped_ddl is not null then t2.queue_name else null end) as _queue1ddl,
           count(case when t2.local then t2.table_name else null end) as _total,
           count(case when t2.local then nullif(t2.merge_state, 'in-copy') else null end) as _done,
           min(n.combined_queue) as _combined_queue,
           count(nullif(t2.queue_name < i_queue_name and t.merge_state = 'in-copy' and t2.merge_state = 'in-copy', false)) as _copy_pos
        from londiste.table_info t
        join pgq_node.node_info n on (n.queue_name = t.queue_name)
        left join pgq_node.node_info n2 on (n2.combined_queue = n.combined_queue or
            (n2.combined_queue is null and n.combined_queue is null))
        left join londiste.table_info t2 on
           (coalesce(t2.dest_table, t2.table_name) = coalesce(t.dest_table, t.table_name) and
            t2.queue_name = n2.queue_name and
            (t2.merge_state is null or t2.merge_state != 'ok'))
        where t.queue_name = i_queue_name and t.table_name = i_table_name
        group by t.nr, t.table_name, t.local, t.merge_state, t.custom_snapshot, t.table_attrs, t.dropped_ddl, t.dest_table
        into merge_state, dest_table, dropped_ddl, q_part1, q_part_ddl, n_parts, n_done, n_combined_queue, copy_pos;

    -- q_part1, q_part_ddl, n_parts, n_done, n_combined_queue, copy_pos, dest_table

    -- be more robust against late joiners
    q_part1 := coalesce(q_part_ddl, q_part1);

    -- turn the logic off if no merge is happening
    if n_parts = 1 then
        q_part1 := null;
    end if;

    if q_part1 is not null then
        if i_queue_name = q_part1 then
            -- lead
            if merge_state = 'in-copy' then
                if dropped_ddl is null and n_done > 0 then
                    -- seems late addition, let it copy with indexes
                    copy_role := 'wait-replay';
                elsif n_done < n_parts then
                    -- show copy_role only if need to drop ddl or already did drop ddl
                    copy_role := 'lead';
                end if;

                -- make sure it cannot be made to wait
                copy_pos := 0;
            end if;
            if merge_state = 'catching-up' and dropped_ddl is not null then
                -- show copy_role only if need to wait for others
                if n_done < n_parts then
                    copy_role := 'wait-replay';
                end if;
            end if;
        else
            -- follow
            if merge_state = 'in-copy' then
                if q_part_ddl is not null then
                    -- can copy, wait in replay until lead has applied ddl
                    copy_role := 'wait-replay';
                elsif n_done > 0 then
                    -- ddl is not dropped, others are active, copy without touching ddl
                    copy_role := 'wait-replay';
                else
                    -- wait for lead to drop ddl
                    copy_role := 'wait-copy';
                end if;
            elsif merge_state = 'catching-up' then
                -- show copy_role only if need to wait for lead
                if q_part_ddl is not null then
                    copy_role := 'wait-replay';
                end if;
            end if;
        end if;
    end if;

    return;
end;
$$ language plpgsql strict stable;