File: londiste_merge_fkeys.sql

package info (click to toggle)
londiste-sql 3.8-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 576 kB
  • sloc: sql: 2,742; python: 309; makefile: 19; sh: 2
file content (154 lines) | stat: -rw-r--r-- 6,430 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154

set client_min_messages = 'warning';
\set VERBOSITY 'terse'

\set ECHO none

create or replace function mkcascade(
    roots text[],
    branches text[], branch_target text,
    leafs text[], leaf_target text,
    tbls text[])
returns text as $$
declare
    node text;
    qname text;
    status int4;
    msg text;
    tbl text;
begin
    for node in select unnest(roots) loop
        qname := node;
        perform pgq_node.register_location(qname, node, 'dbname=' || node, false);
        perform pgq_node.create_node(qname, 'root', node, node || '_worker', null, null, null);
        for tbl in select unnest(tbls) loop
            select ret_code, ret_note into status, msg from londiste.global_add_table(node, tbl);
            if status != 200 then
                raise exception 'global_add_table - %/%', status, msg;
            end if;
        end loop;
    end loop;

    for node in select unnest(branches) loop
        qname := node;
        perform pgq_node.register_location(qname, node, 'dbname=' || node, false);
        perform pgq_node.register_location(qname, node||'-provider', 'dbname=' || node, false);
        perform pgq_node.create_node(qname, 'branch', node, node || '_worker', node||'-provider', 10, branch_target);
        for tbl in select unnest(tbls) loop
            select ret_code, ret_note into status, msg from londiste.global_add_table(node, tbl);
            if status != 200 then
                raise exception 'global_add_table - %/%', status, msg;
            end if;
        end loop;
    end loop;

    for node in select unnest(leafs) loop
        qname := node;
        perform pgq_node.register_location(qname, node, 'dbname=' || node, false);
        perform pgq_node.register_location(qname, node||'-provider', 'dbname=' || node, false);
        perform pgq_node.create_node(qname, 'leaf', node, node || '_worker', node||'-provider', 11, leaf_target);

        for tbl in select unnest(tbls) loop
            select ret_code, ret_note into status, msg from londiste.global_add_table(node, tbl);
            if status != 200 then
                raise exception 'global_add_table - %/%', status, msg;
            end if;
        end loop;
    end loop;
    return 'done';
end;
$$ language plpgsql;

\set ECHO all

-- leaf1/leaf2->root

\set target 'fx1-target'
\set leaf1 'fx1-leaf1'
\set leaf2 'fx1-leaf2'
\set tbl1 'fx1.table1'
\set tbl2 'fx1.table2'

select mkcascade(array[:'target'], array[]::text[], null, array[:'leaf1', :'leaf2'], :'target', array[:'tbl1', :'tbl2']);

insert into londiste.pending_fkeys values (:'tbl1', :'tbl2', 'name', 'def');
select * from londiste.get_table_pending_fkeys(:'tbl1');

update londiste.table_info set merge_state='ok', local=true where table_name in (:'tbl1', :'tbl2');
select * from londiste.get_valid_pending_fkeys(:'target');
select * from londiste.get_valid_pending_fkeys(:'leaf1'); -- pick
select * from londiste.get_valid_pending_fkeys(:'leaf2');

update londiste.table_info set merge_state='catching-up' where table_name = :'tbl1' and queue_name = :'leaf1';
select * from londiste.get_valid_pending_fkeys(:'target');
select * from londiste.get_valid_pending_fkeys(:'leaf1'); -- no
select * from londiste.get_valid_pending_fkeys(:'leaf2');

update londiste.table_info set merge_state=null, local=false where table_name = :'tbl1' and queue_name = :'leaf1';
select * from londiste.get_valid_pending_fkeys(:'target');
select * from londiste.get_valid_pending_fkeys(:'leaf1'); -- pick
select * from londiste.get_valid_pending_fkeys(:'leaf2');

update londiste.table_info set merge_state=null, local=false where table_name = :'tbl2' and queue_name = :'leaf1';
select * from londiste.get_valid_pending_fkeys(:'target');
select * from londiste.get_valid_pending_fkeys(:'leaf1');
select * from londiste.get_valid_pending_fkeys(:'leaf2'); -- pick?

-- leaf1/leaf2->branch

\set target 'fx2-branch'
\set leaf1 'fx2-leaf1'
\set leaf2 'fx2-leaf2'
\set tbl1 'fx2.table1'
\set tbl2 'fx2.table2'

select mkcascade(array[]::text[], array[:'target']::text[], null, array[:'leaf1', :'leaf2'], :'target', array[:'tbl1', :'tbl2']);

insert into londiste.pending_fkeys values (:'tbl1', :'tbl2', 'name', 'def');
select * from londiste.get_table_pending_fkeys(:'tbl1');

update londiste.table_info set merge_state='ok', local=true where table_name in (:'tbl1', :'tbl2');
select * from londiste.get_valid_pending_fkeys(:'target'); -- pick
select * from londiste.get_valid_pending_fkeys(:'leaf1');
select * from londiste.get_valid_pending_fkeys(:'leaf2');

update londiste.table_info set merge_state='catching-up' where table_name = :'tbl1' and queue_name = :'leaf1';
select * from londiste.get_valid_pending_fkeys(:'target'); -- no
select * from londiste.get_valid_pending_fkeys(:'leaf1');
select * from londiste.get_valid_pending_fkeys(:'leaf2');

update londiste.table_info set merge_state=null, local=false where table_name = :'tbl1' and queue_name = :'leaf1';
select * from londiste.get_valid_pending_fkeys(:'target'); -- pick
select * from londiste.get_valid_pending_fkeys(:'leaf1');
select * from londiste.get_valid_pending_fkeys(:'leaf2');


-- leaf1/leaf2->no branch

\set leaf1 'fx3-leaf1'
\set leaf2 'fx3-leaf2'
\set tbl1 'fx3.table1'
\set tbl2 'fx3.table2'

select mkcascade(array[]::text[], array[]::text[], null, array[:'leaf1', :'leaf2'], null, array[:'tbl1', :'tbl2']);

insert into londiste.pending_fkeys values (:'tbl1', :'tbl2', 'name', 'def');
select * from londiste.get_table_pending_fkeys(:'tbl1');

update londiste.table_info set merge_state='ok', local=true where table_name in (:'tbl1', :'tbl2');
select * from londiste.get_valid_pending_fkeys(:'leaf1'); -- pick
select * from londiste.get_valid_pending_fkeys(:'leaf2');

update londiste.table_info set merge_state='catching-up' where table_name = :'tbl1' and queue_name = :'leaf1';
select * from londiste.get_valid_pending_fkeys(:'leaf1');
select * from londiste.get_valid_pending_fkeys(:'leaf2');

update londiste.table_info set merge_state=null, local=false where table_name = :'tbl1' and queue_name = :'leaf1';
select * from londiste.get_valid_pending_fkeys(:'leaf1'); -- pick
select * from londiste.get_valid_pending_fkeys(:'leaf2');

update londiste.table_info set merge_state=null, local=false where table_name = :'tbl2' and queue_name = :'leaf1';
select * from londiste.get_valid_pending_fkeys(:'leaf1');
select * from londiste.get_valid_pending_fkeys(:'leaf2'); -- pick