1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
|
from select import select
import pytest
import psycopg
from psycopg import pq
from psycopg.generators import execute
def execute_wait(pgconn):
return psycopg.waiting.wait(execute(pgconn), pgconn.socket)
def test_send_query(pgconn):
# This test shows how to process an async query in all its glory
pgconn.nonblocking = 1
# Long query to make sure we have to wait on send
pgconn.send_query(
b"/* %s */ select 'x' as f from pg_sleep(0.01); select 1 as foo;"
% (b"x" * 1_000_000)
)
# send loop
waited_on_send = 0
while pgconn.flush() != 0:
waited_on_send += 1
rl, wl, xl = select([pgconn.socket], [pgconn.socket], [])
assert not (rl and wl)
if wl:
continue # call flush again()
if rl:
pgconn.consume_input()
continue
# TODO: this check is not reliable, it fails on travis sometimes
# assert waited_on_send
# read loop
results = []
while True:
pgconn.consume_input()
if pgconn.is_busy():
select([pgconn.socket], [], [])
continue
if (res := pgconn.get_result()) is None:
break
assert res.status == pq.ExecStatus.TUPLES_OK
results.append(res)
assert len(results) == 2
assert results[0].nfields == 1
assert results[0].fname(0) == b"f"
assert results[0].get_value(0, 0) == b"x"
assert results[1].nfields == 1
assert results[1].fname(0) == b"foo"
assert results[1].get_value(0, 0) == b"1"
def test_send_query_compact_test(pgconn):
# Like the above test but use psycopg facilities for compactness
pgconn.send_query(
b"/* %s */ select 'x' as f from pg_sleep(0.01); select 1 as foo;"
% (b"x" * 1_000_000)
)
results = execute_wait(pgconn)
assert len(results) == 2
assert results[0].nfields == 1
assert results[0].fname(0) == b"f"
assert results[0].get_value(0, 0) == b"x"
assert results[1].nfields == 1
assert results[1].fname(0) == b"foo"
assert results[1].get_value(0, 0) == b"1"
pgconn.finish()
with pytest.raises(psycopg.OperationalError):
pgconn.send_query(b"select 1")
def test_single_row_mode(pgconn):
pgconn.send_query(b"select generate_series(1,2)")
pgconn.set_single_row_mode()
results = execute_wait(pgconn)
assert len(results) == 3
res = results[0]
assert res.status == pq.ExecStatus.SINGLE_TUPLE
assert res.ntuples == 1
assert res.get_value(0, 0) == b"1"
res = results[1]
assert res.status == pq.ExecStatus.SINGLE_TUPLE
assert res.ntuples == 1
assert res.get_value(0, 0) == b"2"
res = results[2]
assert res.status == pq.ExecStatus.TUPLES_OK
assert res.ntuples == 0
@pytest.mark.libpq(">= 17")
def test_chunked_rows_mode(pgconn):
pgconn.send_query(b"select generate_series(1,7)")
pgconn.set_chunked_rows_mode(3)
results = execute_wait(pgconn)
assert len(results) == 4
res = results[0]
assert res.status == pq.ExecStatus.TUPLES_CHUNK
assert res.ntuples == 3
assert [res.get_value(i, 0) for i in range(3)] == [b"1", b"2", b"3"]
res = results[1]
assert res.status == pq.ExecStatus.TUPLES_CHUNK
assert res.ntuples == 3
assert [res.get_value(i, 0) for i in range(3)] == [b"4", b"5", b"6"]
res = results[2]
assert res.status == pq.ExecStatus.TUPLES_CHUNK
assert res.ntuples == 1
assert res.get_value(0, 0) == b"7"
res = results[3]
assert res.status == pq.ExecStatus.TUPLES_OK
assert res.ntuples == 0
def test_send_query_params(pgconn):
pgconn.send_query_params(b"select $1::int + $2", [b"5", b"3"])
(res,) = execute_wait(pgconn)
assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == b"8"
pgconn.finish()
with pytest.raises(psycopg.OperationalError):
pgconn.send_query_params(b"select $1", [b"1"])
def test_send_prepare(pgconn):
pgconn.send_prepare(b"prep", b"select $1::int + $2::int")
(res,) = execute_wait(pgconn)
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
pgconn.send_query_prepared(b"prep", [b"3", b"5"])
(res,) = execute_wait(pgconn)
assert res.get_value(0, 0) == b"8"
pgconn.finish()
with pytest.raises(psycopg.OperationalError):
pgconn.send_prepare(b"prep", b"select $1::int + $2::int")
with pytest.raises(psycopg.OperationalError):
pgconn.send_query_prepared(b"prep", [b"3", b"5"])
def test_send_prepare_types(pgconn):
pgconn.send_prepare(b"prep", b"select $1 + $2", [23, 23])
(res,) = execute_wait(pgconn)
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
pgconn.send_query_prepared(b"prep", [b"3", b"5"])
(res,) = execute_wait(pgconn)
assert res.get_value(0, 0) == b"8"
def test_send_prepared_binary_in(pgconn):
val = b"foo\00bar"
pgconn.send_prepare(b"", b"select length($1::bytea), length($2::bytea)")
(res,) = execute_wait(pgconn)
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
pgconn.send_query_prepared(b"", [val, val], param_formats=[0, 1])
(res,) = execute_wait(pgconn)
assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == b"3"
assert res.get_value(0, 1) == b"7"
with pytest.raises(ValueError):
pgconn.exec_params(b"select $1::bytea", [val], param_formats=[1, 1])
@pytest.mark.parametrize("fmt, out", [(0, b"\\x666f6f00626172"), (1, b"foo\00bar")])
def test_send_prepared_binary_out(pgconn, fmt, out):
val = b"foo\00bar"
pgconn.send_prepare(b"", b"select $1::bytea")
(res,) = execute_wait(pgconn)
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
pgconn.send_query_prepared(b"", [val], param_formats=[1], result_format=fmt)
(res,) = execute_wait(pgconn)
assert res.status == pq.ExecStatus.TUPLES_OK
assert res.get_value(0, 0) == out
def test_send_describe_prepared(pgconn):
pgconn.send_prepare(b"prep", b"select $1::int8 + $2::int8 as fld")
(res,) = execute_wait(pgconn)
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
pgconn.send_describe_prepared(b"prep")
(res,) = execute_wait(pgconn)
assert res.nfields == 1
assert res.ntuples == 0
assert res.fname(0) == b"fld"
assert res.ftype(0) == 20
pgconn.finish()
with pytest.raises(psycopg.OperationalError):
pgconn.send_describe_prepared(b"prep")
@pytest.mark.libpq(">= 17")
def test_send_close_prepared(pgconn):
pgconn.send_prepare(b"prep", b"select $1::int8 + $2::int8 as fld")
(res,) = execute_wait(pgconn)
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
pgconn.send_close_prepared(b"prep")
(res,) = execute_wait(pgconn)
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
# Because we closed it, describing should not work
pgconn.send_describe_prepared(b"prep")
(res,) = execute_wait(pgconn)
assert res.status == pq.ExecStatus.FATAL_ERROR
@pytest.mark.libpq("< 17")
def test_send_close_prepared_no_close(pgconn):
with pytest.raises(psycopg.NotSupportedError):
pgconn.send_close_prepared(b"prep")
@pytest.mark.crdb_skip("server-side cursor")
def test_send_describe_portal(pgconn):
res = pgconn.exec_(
b"""
begin;
declare cur cursor for select * from generate_series(1,10) foo;
"""
)
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
pgconn.send_describe_portal(b"cur")
(res,) = execute_wait(pgconn)
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
assert res.nfields == 1
assert res.fname(0) == b"foo"
pgconn.finish()
with pytest.raises(psycopg.OperationalError):
pgconn.send_describe_portal(b"cur")
@pytest.mark.libpq(">= 17")
@pytest.mark.crdb_skip("close portal")
def test_send_close_portal(pgconn):
res = pgconn.exec_(
b"""
begin;
declare cur cursor for select * from generate_series(1,10) foo;
"""
)
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
pgconn.send_close_portal(b"cur")
(res,) = execute_wait(pgconn)
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
# Because we closed it, describing should not work
pgconn.send_describe_portal(b"cur")
(res,) = execute_wait(pgconn)
assert res.status == pq.ExecStatus.FATAL_ERROR
@pytest.mark.libpq("< 17")
def test_send_close_portal_no_close(pgconn):
with pytest.raises(psycopg.NotSupportedError):
pgconn.send_close_portal(b"cur")
|