1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
# WARNING: this file is auto-generated by 'async_to_sync.py'
# from the original file 'test_cursor_client_async.py'
# DO NOT CHANGE! Change the original file instead.
import datetime as dt
import pytest
import psycopg
from psycopg import rows
from .fix_crdb import crdb_encoding
@pytest.fixture
def conn(conn, anyio_backend):
conn.cursor_factory = psycopg.ClientCursor
return conn
def test_default_cursor(conn):
cur = conn.cursor()
assert type(cur) is psycopg.ClientCursor
def test_str(conn):
cur = conn.cursor()
assert "psycopg.%s" % psycopg.ClientCursor.__name__ in str(cur)
def test_from_cursor_factory(conn_cls, dsn):
with conn_cls.connect(dsn, cursor_factory=psycopg.ClientCursor) as conn:
cur = conn.cursor()
assert type(cur) is psycopg.ClientCursor
def test_execute_many_results_param(conn):
cur = conn.cursor()
assert cur.nextset() is None
rv = cur.execute("select %s; select generate_series(1, %s)", ("foo", 3))
assert rv is cur
assert cur.fetchall() == [("foo",)]
assert cur.rowcount == 1
assert cur.nextset()
assert cur.fetchall() == [(1,), (2,), (3,)]
assert cur.nextset() is None
cur.close()
assert cur.nextset() is None
def test_query_params_execute(conn):
cur = conn.cursor()
assert cur._query is None
cur.execute("select %t, %s::text", [1, None])
assert cur._query is not None
assert cur._query.query == b"select 1, NULL::text"
assert cur._query.params == (b"1", b"NULL")
cur.execute("select 1")
assert cur._query.query == b"select 1"
assert not cur._query.params
with pytest.raises(psycopg.DataError):
cur.execute("select %t::int", ["wat"])
assert cur._query.query == b"select 'wat'::int"
assert cur._query.params == (b"'wat'",)
def test_query_params_executemany(conn):
cur = conn.cursor()
cur.executemany("select %t, %t", [[1, 2], [3, 4]])
assert cur._query.query == b"select 3, 4"
assert cur._query.params == (b"3", b"4")
@pytest.mark.slow
@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
def test_leak(conn_cls, dsn, faker, fetch, row_factory, gc):
faker.choose_schema(ncols=5)
faker.make_records(10)
row_factory = getattr(rows, row_factory)
def work():
with conn_cls.connect(dsn) as conn, conn.transaction(force_rollback=True):
with psycopg.ClientCursor(conn, row_factory=row_factory) as cur:
cur.execute(faker.drop_stmt)
cur.execute(faker.create_stmt)
with faker.find_insert_problem(conn):
cur.executemany(faker.insert_stmt, faker.records)
cur.execute(faker.select_stmt)
if fetch == "one":
while cur.fetchone() is not None:
pass
elif fetch == "many":
while cur.fetchmany(3):
pass
elif fetch == "all":
cur.fetchall()
elif fetch == "iter":
for rec in cur:
pass
n = []
gc.collect()
for i in range(3):
work()
gc.collect()
n.append(gc.count())
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
@pytest.mark.parametrize(
"query, params, want",
[
("select 'hello'", (), "select 'hello'"),
("select %s, %s", ([1, dt.date(2020, 1, 1)],), "select 1, '2020-01-01'::date"),
("select %(foo)s, %(foo)s", ({"foo": "x"},), "select 'x', 'x'"),
("select %%", (), "select %%"),
("select %%, %s", (["a"],), "select %, 'a'"),
("select %%, %(foo)s", ({"foo": "x"},), "select %, 'x'"),
("select %%s, %(foo)s", ({"foo": "x"},), "select %s, 'x'"),
],
)
def test_mogrify(conn, query, params, want):
cur = conn.cursor()
got = cur.mogrify(query, *params)
assert got == want
@pytest.mark.parametrize("encoding", ["utf8", crdb_encoding("latin9")])
def test_mogrify_encoding(conn, encoding):
conn.execute(f"set client_encoding to {encoding}")
q = conn.cursor().mogrify("select %(s)s", {"s": "€"})
assert q == "select '€'"
@pytest.mark.parametrize("encoding", [crdb_encoding("latin1")])
def test_mogrify_badenc(conn, encoding):
conn.execute(f"set client_encoding to {encoding}")
with pytest.raises(UnicodeEncodeError):
conn.cursor().mogrify("select %(s)s", {"s": "€"})
|