1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
|
\set ECHO none
-- wait for other processes, wait max 100 sec
do $$
declare c int;
begin
if pg_try_advisory_xact_lock(1) then
for i in 1..1000 loop
perform pg_sleep(0.1);
c := (select count(*) from pg_locks where locktype = 'advisory' and objid = 1 and not granted);
if c = 1 then
return;
end if;
end loop;
else
perform pg_advisory_xact_lock(1);
end if;
end;
$$;
SET client_min_messages = warning;
DROP TABLE IF EXISTS TEMP;
CREATE TABLE TEMP(id integer,name text);
INSERT INTO TEMP VALUES (1,'bob'),(2,'rob'),(3,'john');
DROP USER IF EXISTS regress_pipe_test_owner;
CREATE ROLE regress_pipe_test_owner WITH CREATEROLE;
ALTER TABLE TEMP OWNER TO regress_pipe_test_owner;
SET client_min_messages = notice;
-- Notify session B of 'regress_pipe_test_owner' having been created.
SELECT dbms_pipe.pack_message(1);
SELECT dbms_pipe.send_message('pipe_test_owner_created_notifier');
-- Create a new connection under the userid of regress_pipe_test_owner
SET SESSION AUTHORIZATION regress_pipe_test_owner;
/* create an implicit pipe and sends message using
* send_message(text,integer,integer)
*/
CREATE OR REPLACE FUNCTION send(pipename text) RETURNS void AS $$
BEGIN
IF dbms_pipe.send_message(pipename,2,10) = 1 THEN
RAISE NOTICE 'Timeout';
PERFORM pg_sleep(2);
PERFORM dbms_pipe.send_message(pipename,2,10);
END IF;
END; $$ LANGUAGE plpgsql;
-- Test pack_message for all supported types and send_message
CREATE OR REPLACE FUNCTION createImplicitPipe() RETURNS void AS $$
DECLARE
row TEMP%ROWTYPE;
BEGIN
PERFORM dbms_pipe.pack_message('Message From Session A'::text);
PERFORM send('named_pipe');
PERFORM dbms_pipe.pack_message('2013-01-01'::date);
PERFORM send('named_pipe');
PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00'::timestamp);
PERFORM send('named_pipe');
PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00-08'::timestamptz);
PERFORM send('named_pipe');
PERFORM dbms_pipe.pack_message(12345.6789::numeric);
PERFORM send('named_pipe');
PERFORM dbms_pipe.pack_message(12345::integer);
PERFORM send('named_pipe');
PERFORM dbms_pipe.pack_message(99999999999::bigint);
PERFORM send('named_pipe');
PERFORM dbms_pipe.pack_message(E'\\201'::bytea);
PERFORM send('named_pipe');
SELECT * INTO row FROM TEMP WHERE id=2;
PERFORM dbms_pipe.pack_message(row);
PERFORM send('named_pipe');
END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION bulkSend() RETURNS void AS $$
DECLARE
row TEMP%ROWTYPE;
BEGIN
PERFORM dbms_pipe.pack_message('Message From Session A'::text);
PERFORM dbms_pipe.pack_message('2013-01-01'::date);
PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00'::timestamp);
PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00-08'::timestamptz);
PERFORM dbms_pipe.pack_message(12345.6789::numeric);
PERFORM dbms_pipe.pack_message(12345::integer);
PERFORM dbms_pipe.pack_message(99999999999::bigint);
PERFORM dbms_pipe.pack_message(E'\\201'::bytea);
SELECT * INTO row FROM TEMP WHERE id=2;
PERFORM dbms_pipe.pack_message(row);
PERFORM send('named_pipe_2');
END; $$ LANGUAGE plpgsql;
/* Creates an explicit pipe using either create_pipe(text,integer,bool),
* create_pipe(text,integer) OR create_pipe(text).
* In case third parameter (bool) absent, default is false, that is, it's a public pipe.
*/
CREATE OR REPLACE FUNCTION createPipe(name text,ver integer) RETURNS void AS $$
BEGIN
IF ver = 3 THEN
PERFORM dbms_pipe.create_pipe(name,4,true);
ELSIF ver = 2 THEN
PERFORM dbms_pipe.create_pipe(name,4);
ELSE
PERFORM dbms_pipe.create_pipe(name);
END IF;
END; $$ LANGUAGE plpgsql;
/* Testing create_pipe for different versions, one of them, is the case of
* private pipe
*/
CREATE OR REPLACE FUNCTION createExplicitPipe(pipename text,create_version integer) RETURNS void AS $$
DECLARE
row TEMP%ROWTYPE;
BEGIN
PERFORM createPipe(pipename,create_version);
PERFORM dbms_pipe.reset_buffer();
PERFORM dbms_pipe.pack_message('Message From Session A'::text);
PERFORM send(pipename);
PERFORM dbms_pipe.pack_message('2013-01-01'::date);
PERFORM send(pipename);
PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00'::timestamp);
PERFORM send(pipename);
PERFORM dbms_pipe.pack_message('2013-01-01 09:00:00-08'::timestamptz);
PERFORM send(pipename);
PERFORM dbms_pipe.pack_message(12345.6789::numeric);
PERFORM send(pipename);
PERFORM dbms_pipe.pack_message(12345::integer);
PERFORM send(pipename);
PERFORM dbms_pipe.pack_message(99999999999::bigint);
PERFORM send(pipename);
PERFORM dbms_pipe.pack_message(E'\\201'::bytea);
PERFORM send(pipename);
SELECT * INTO row FROM TEMP WHERE id=2;
PERFORM dbms_pipe.pack_message(row);
PERFORM send(pipename);
END; $$ LANGUAGE plpgsql;
-- Test send_message(text)
CREATE OR REPLACE FUNCTION checkSend1() RETURNS void AS $$
BEGIN
PERFORM dbms_pipe.pack_message('checking one-argument send_message()');
PERFORM dbms_pipe.send_message('pipe_name_1');
END; $$ LANGUAGE plpgsql;
-- Test send_message(text,integer)
CREATE OR REPLACE FUNCTION checkSend2() RETURNS void AS $$
BEGIN
PERFORM dbms_pipe.pack_message('checking two-argument send_message()');
IF dbms_pipe.send_message('pipe_name_2',2) = 1 THEN
RAISE NOTICE 'Timeout';
PERFORM pg_sleep(2);
PERFORM dbms_pipe.send_message('pipe_name_2',2);
END IF;
END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION notifyDropTemp() RETURNS void AS $$
BEGIN
PERFORM dbms_pipe.pack_message(1);
PERFORM dbms_pipe.send_message('pipe_name_3');
END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION checkUniqueSessionNameA() RETURNS void AS $$
BEGIN
PERFORM dbms_pipe.pack_message(dbms_pipe.unique_session_name());
PERFORM dbms_pipe.send_message('pipe_name_4');
END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION notify(pipename text) RETURNS void AS $$
BEGIN
PERFORM dbms_pipe.pack_message(1);
PERFORM dbms_pipe.send_message(pipename);
END; $$ LANGUAGE plpgsql;
\set ECHO all
SELECT createImplicitPipe();
-- Bulk send messages
SELECT bulkSend();
-- An explicit private pipe
SELECT notify('recv_private1_notifier');
SELECT createExplicitPipe('private_pipe_1',3);
-- An explicit private pipe
SELECT notify('recv_private2_notifier');
SELECT createExplicitPipe('private_pipe_2',3);
-- An explicit public pipe (uses two-argument create_pipe)
SELECT notify('recv_public1_notifier');
SELECT createExplicitPipe('public_pipe_3',2);
-- An explicit public pipe (uses one-argument create_pipe)
SELECT notify('recv_public2_notifier');
SELECT createExplicitPipe('public_pipe_4',1);
-- tests send_message(text)
SELECT checkSend1();
-- tests send_message(text,integer)
SELECT checkSend2();
SELECT notifyDropTemp();
-- tests unique_session_name()
SELECT checkUniqueSessionNameA();
DROP FUNCTION createImplicitPipe();
DROP FUNCTION createExplicitPipe(text,integer);
DROP FUNCTION createPipe(text,integer);
DROP FUNCTION checkSend1();
DROP FUNCTION checkSend2();
DROP FUNCTION checkUniqueSessionNameA();
DROP FUNCTION bulkSend();
DROP FUNCTION notifyDropTemp();
DROP FUNCTION notify(text);
DROP FUNCTION send(text);
|