1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
|
\set ECHO none
-- wait for other processes, wait max 100 sec
do $$
declare c int;
begin
if pg_try_advisory_xact_lock(1) then
for i in 1..1000 loop
perform pg_sleep(0.1);
c := (select count(*) from pg_locks where locktype = 'advisory' and objid = 1 and not granted);
if c = 1 then
return;
end if;
end loop;
else
perform pg_advisory_xact_lock(1);
end if;
end;
$$;
\set VERBOSITY terse
--Wait for 'regress_pipe_test_owner' created notification to be sent by session A
SELECT dbms_pipe.receive_message('pipe_test_owner_created_notifier');
-- create new connection under the userid of 'regress_pipe_test_owner'
SET SESSION AUTHORIZATION regress_pipe_test_owner;
/* Tests receive_message(text,integer), next_item_type() and all versions of
* unpack_message_<type>() and purge(text)
*/
CREATE OR REPLACE FUNCTION receiveFrom(pipename text) RETURNS void AS $$
DECLARE
typ INTEGER;
BEGIN
WHILE true LOOP
PERFORM dbms_pipe.receive_message(pipename,2);
SELECT dbms_pipe.next_item_type() INTO typ;
IF typ = 0 THEN EXIT;
ELSIF typ=9 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_number();
ELSIF typ=11 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_text();
ELSIF typ=12 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_date();
ELSIF typ=13 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_timestamp();
ELSIF typ=23 THEN RAISE NOTICE 'RECEIVE %: %', typ, encode(dbms_pipe.unpack_message_bytea(),'escape');
ELSIF typ=24 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_record();
END IF;
END LOOP;
PERFORM dbms_pipe.purge(pipename);
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION bulkReceive() RETURNS void AS $$
DECLARE
typ INTEGER;
BEGIN
IF dbms_pipe.receive_message('named_pipe_2',2) = 1 THEN
RAISE NOTICE 'Timeout';
PERFORM pg_sleep(2);
PERFORM dbms_pipe.receive_message('named_pipe_2',2);
END IF;
WHILE true LOOP
SELECT dbms_pipe.next_item_type() INTO typ;
IF typ = 0 THEN EXIT;
ELSIF typ=9 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_number();
ELSIF typ=11 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_text();
ELSIF typ=12 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_date();
ELSIF typ=13 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_timestamp();
ELSIF typ=23 THEN RAISE NOTICE 'RECEIVE %: %', typ, encode(dbms_pipe.unpack_message_bytea()::bytea,'escape');
ELSIF typ=24 THEN RAISE NOTICE 'RECEIVE %: %', typ, dbms_pipe.unpack_message_record();
END IF;
END LOOP;
PERFORM dbms_pipe.purge('named_pipe_2');
END;
$$ LANGUAGE plpgsql;
-- Tests receive_message(text)
CREATE OR REPLACE FUNCTION checkReceive1(pipename text) RETURNS void AS $$
BEGIN
PERFORM dbms_pipe.receive_message(pipename);
RAISE NOTICE 'RECEIVE %',dbms_pipe.unpack_message_text();
END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION dropTempTable() RETURNS void AS $$
BEGIN
WHILE dbms_pipe.receive_message('pipe_name_3') <> 0 LOOP
CONTINUE;
END LOOP;
DROP TABLE TEMP;
END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION checkUniqueSessionNameB() RETURNS bool AS $$
DECLARE
result bool;
BEGIN
PERFORM dbms_pipe.receive_message('pipe_name_4');
SELECT dbms_pipe.unpack_message_text() = dbms_pipe.unique_session_name() INTO result;
RETURN result;
END; $$ LANGUAGE plpgsql;
\set ECHO all
-- Receives messages sent via an implicit pipe
SELECT receiveFrom('named_pipe');
-- Bulk receive messages
SELECT bulkReceive();
-- Receives messages sent via an explicit private pipe under the same user
-- 'regress_pipe_test_owner'
SELECT dbms_pipe.receive_message('recv_private1_notifier');
SELECT receiveFrom('private_pipe_1');
-- Switch user to 'regress_pipe_test_other'
DROP USER IF EXISTS regress_pipe_test_other;
CREATE USER regress_pipe_test_other;
SET SESSION AUTHORIZATION regress_pipe_test_other;
-- Try to receive messages sent via an explicit private pipe under the user
-- 'regress_pipe_test_other' who is not the owner of pipe.
-- insufficient privileges in case of 'private_pipe_2'.
SELECT dbms_pipe.receive_message('recv_private2_notifier');
SELECT receiveFrom('private_pipe_2');
-- These are explicit private pipes created using create_pipe(text,integer)
-- and create_pipe(text)
SELECT dbms_pipe.receive_message('recv_public1_notifier');
SELECT receiveFrom('public_pipe_3');
SELECT dbms_pipe.receive_message('recv_public2_notifier');
SELECT receiveFrom('public_pipe_4');
-- Switch back to user 'regress_pipe_test_owner'
SET SESSION AUTHORIZATION regress_pipe_test_owner;
DROP USER regress_pipe_test_other;
-- Tests receive_message(text)
SELECT checkReceive1('pipe_name_1');
SELECT checkReceive1('pipe_name_2');
-- Tests dbms_pipe.db_pipes view
SELECT name, items, "limit", private, owner
FROM dbms_pipe.db_pipes
WHERE name LIKE 'private%'
ORDER BY name;
-- Tests dbms_pipe.__list_pipes(); attribute size is not included
-- since it can be different across runs.
SELECT name, items, "limit", private, owner
FROM dbms_pipe.__list_pipes() AS (name varchar, items int4, siz int4, "limit" int4, private bool, owner varchar)
WHERE name <> 'pipe_name_4'
ORDER BY 1;
-- Tests remove_pipe(text)
SELECT dbms_pipe.remove_pipe('private_pipe_1');
SELECT dbms_pipe.remove_pipe('private_pipe_2');
SELECT dbms_pipe.remove_pipe('public_pipe_3');
SELECT dbms_pipe.remove_pipe('public_pipe_4');
SELECT dbms_pipe.purge('pipe_name_1');
SELECT dbms_pipe.purge('pipe_name_2');
-- Receives drop table notification from session A via 'pipe_name_3'
SELECT dropTempTable();
SELECT dbms_pipe.purge('pipe_name_3');
-- tests unique_session_name() (uses 'pipe_name_4')
SELECT checkUniqueSessionNameB();
SELECT dbms_pipe.purge('pipe_name_4');
DROP FUNCTION receiveFrom(text);
DROP FUNCTION checkReceive1(text);
DROP FUNCTION checkUniqueSessionNameB();
DROP FUNCTION bulkReceive();
DROP FUNCTION dropTempTable();
-- Perform a recieve on removed pipe resulting on timeout
SELECT dbms_pipe.receive_message('public_pipe_4',2);
SELECT dbms_pipe.purge('public_pipe_4');
SET SESSION AUTHORIZATION DEFAULT;
DROP USER regress_pipe_test_owner;
|