1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
|
create or replace function londiste.get_table_pending_fkeys(i_table_name text)
returns setof londiste.pending_fkeys as $$
-- ----------------------------------------------------------------------
-- Function: londiste.get_table_pending_fkeys(1)
--
-- Return dropped fkeys for table.
--
-- Parameters:
-- i_table_name - fqname
--
-- Returns:
-- desc
-- ----------------------------------------------------------------------
declare
fkeys record;
begin
for fkeys in
select *
from londiste.pending_fkeys
where from_table = i_table_name or to_table = i_table_name
order by 1,2,3
loop
return next fkeys;
end loop;
return;
end;
$$ language plpgsql strict stable;
create or replace function londiste.get_valid_pending_fkeys(i_queue_name text)
returns setof londiste.pending_fkeys as $$
-- ----------------------------------------------------------------------
-- Function: londiste.get_valid_pending_fkeys(1)
--
-- Returns dropped fkeys where both sides are in sync now.
--
-- Parameters:
-- i_queue_name - cascaded queue name
--
-- Returns:
-- desc
-- ----------------------------------------------------------------------
declare
fkeys record;
from_info record;
to_info record;
min_queue_name text;
begin
for fkeys in
select pf.*
from londiste.pending_fkeys pf
order by from_table, to_table, fkey_name
loop
select count(1) as num_total,
sum(case when t.queue_name = i_queue_name then 1 else 0 end) as num_matching,
sum(case when t.merge_state = 'ok' and t.custom_snapshot is null then 1 else 0 end) as num_ok
from londiste.table_info t
where coalesce(t.dest_table, t.table_name) = fkeys.from_table
and t.local
into from_info;
-- skip fkeys without known status
if from_info.num_total = 0 then
continue;
end if;
select count(1) as num_total,
sum(case when t.queue_name = i_queue_name then 1 else 0 end) as num_matching,
sum(case when t.merge_state = 'ok' and t.custom_snapshot is null then 1 else 0 end) as num_ok
from londiste.table_info t
where coalesce(t.dest_table, t.table_name) = fkeys.to_table
and t.local
into to_info;
-- skip fkeys without known status
if to_info.num_total = 0 then
continue;
end if;
-- skip if not all copies are finished
if from_info.num_ok < from_info.num_total then
continue;
end if;
if to_info.num_ok < to_info.num_total then
continue;
end if;
-- skip if table is not owned by i_queue_name
if from_info.num_matching = 0 and to_info.num_matching = 0 then
continue;
end if;
-- pick right queue
-- combined_root: first leaf node
-- combined_branch: branch node
-- default: first node
select coalesce(
min(case when c.node_type = 'root' then t.queue_name else null end),
min(case when c.node_type = 'branch' then c.queue_name else null end),
min(t.queue_name))
into min_queue_name
from londiste.table_info t
join pgq_node.node_info n on (n.queue_name = t.queue_name)
left join pgq_node.node_info c on (c.queue_name = n.combined_queue)
where coalesce(t.dest_table, t.table_name) in (fkeys.to_table, fkeys.from_table)
and t.local;
if i_queue_name = min_queue_name then
return next fkeys;
end if;
end loop;
return;
end;
$$ language plpgsql strict stable;
create or replace function londiste.drop_table_fkey(i_from_table text, i_fkey_name text)
returns integer as $$
-- ----------------------------------------------------------------------
-- Function: londiste.drop_table_fkey(2)
--
-- Drop one fkey, save in pending table.
-- ----------------------------------------------------------------------
declare
fkey record;
begin
select * into fkey
from londiste.find_table_fkeys(i_from_table)
where fkey_name = i_fkey_name and from_table = i_from_table;
if not found then
return 0;
end if;
insert into londiste.pending_fkeys values (fkey.from_table, fkey.to_table, i_fkey_name, fkey.fkey_def);
execute 'alter table only ' || londiste.quote_fqname(fkey.from_table)
|| ' drop constraint ' || quote_ident(i_fkey_name);
return 1;
end;
$$ language plpgsql strict;
drop function if exists londiste.restore_table_fkey(text, text);
create or replace function londiste.restore_table_fkey(i_from_table text, i_fkey_name text, i_lazy boolean default false)
returns text as $$
-- ----------------------------------------------------------------------
-- Function: londiste.restore_table_fkey(3)
--
-- Restore dropped fkey.
--
-- Parameters:
-- i_from_table - source table
-- i_fkey_name - fkey name
-- i_lazy - if true, then use multi-step create
--
-- Returns:
-- '' - done
-- sql - SQL statement to be executed
-- ----------------------------------------------------------------------
declare
fkey record;
is_valid boolean;
tbl_oid oid;
begin
select * into fkey
from londiste.pending_fkeys
where fkey_name = i_fkey_name and from_table = i_from_table
for update;
if not found then
return '';
end if;
if i_lazy then
tbl_oid := londiste.find_table_oid(i_from_table);
select convalidated into is_valid from pg_constraint c
where c.contype = 'f' and c.conrelid = tbl_oid and c.conname = i_fkey_name;
if not found then
-- create fkey with NOT VALID
return fkey.fkey_def || ' not valid';
elsif not is_valid then
-- validate
return 'alter table only ' || londiste.quote_fqname(fkey.from_table)
|| ' validate constraint ' || quote_ident(fkey.fkey_name);
end if;
else
-- create fkey
execute fkey.fkey_def;
end if;
delete from londiste.pending_fkeys
where fkey_name = fkey.fkey_name and from_table = fkey.from_table;
return '';
end;
$$ language plpgsql strict;
|