1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
|
import pytest
import psycopg
from psycopg import rows
from .utils import eur
def test_tuple_row(conn):
conn.row_factory = rows.dict_row
assert conn.execute("select 1 as a").fetchone() == {"a": 1}
cur = conn.cursor(row_factory=rows.tuple_row)
row = cur.execute("select 1 as a").fetchone()
assert row == (1,)
assert type(row) is tuple
assert cur._make_row is tuple
def test_dict_row(conn):
cur = conn.cursor(row_factory=rows.dict_row)
cur.execute("select 'bob' as name, 3 as id")
assert cur.fetchall() == [{"name": "bob", "id": 3}]
cur.execute("select 'a' as letter; select 1 as number")
assert cur.fetchall() == [{"letter": "a"}]
assert cur.nextset()
assert cur.fetchall() == [{"number": 1}]
assert not cur.nextset()
def test_namedtuple_row(conn):
rows._make_nt.cache_clear()
cur = conn.cursor(row_factory=rows.namedtuple_row)
cur.execute("select 'bob' as name, 3 as id")
(person1,) = cur.fetchall()
assert f"{person1.name} {person1.id}" == "bob 3"
ci1 = rows._make_nt.cache_info()
assert ci1.hits == 0 and ci1.misses == 1
cur.execute("select 'alice' as name, 1 as id")
(person2,) = cur.fetchall()
assert type(person2) is type(person1)
ci2 = rows._make_nt.cache_info()
assert ci2.hits == 1 and ci2.misses == 1
cur.execute("select 'foo', 1 as id")
(r0,) = cur.fetchall()
assert r0.f_column_ == "foo"
assert r0.id == 1
cur.execute("select 'a' as letter; select 1 as number")
(r1,) = cur.fetchall()
assert r1.letter == "a"
assert cur.nextset()
(r2,) = cur.fetchall()
assert r2.number == 1
assert not cur.nextset()
assert type(r1) is not type(r2)
cur.execute(f'select 1 as üåäö, 2 as _, 3 as "123", 4 as "a-b", 5 as "{eur}eur"')
(r3,) = cur.fetchall()
assert r3.üåäö == 1
assert r3.f_ == 2
assert r3.f123 == 3
assert r3.a_b == 4
assert r3.f_eur == 5
def test_class_row(conn):
cur = conn.cursor(row_factory=rows.class_row(Person))
cur.execute("select 'John' as first, 'Doe' as last")
(p,) = cur.fetchall()
assert isinstance(p, Person)
assert p.first == "John"
assert p.last == "Doe"
assert p.age is None
for query in (
"select 'John' as first",
"select 'John' as first, 'Doe' as last, 42 as wat",
):
cur.execute(query)
with pytest.raises(TypeError):
cur.fetchone()
def test_args_row(conn):
cur = conn.cursor(row_factory=rows.args_row(argf))
cur.execute("select 'John' as first, 'Doe' as last")
assert cur.fetchone() == "JohnDoe"
def test_kwargs_row(conn):
cur = conn.cursor(row_factory=rows.kwargs_row(kwargf))
cur.execute("select 'John' as first, 'Doe' as last")
(p,) = cur.fetchall()
assert isinstance(p, Person)
assert p.first == "John"
assert p.last == "Doe"
assert p.age == 42
def test_scalar_row(conn):
cur = conn.cursor(row_factory=rows.scalar_row)
cur.execute("select 1")
assert cur.fetchone() == 1
cur.execute("select 1, 2")
assert cur.fetchone() == 1
with pytest.raises(psycopg.ProgrammingError):
cur.execute("select")
@pytest.mark.parametrize(
"factory", "tuple_row dict_row namedtuple_row class_row args_row kwargs_row".split()
)
def test_no_result(factory, conn):
cur = conn.cursor(row_factory=factory_from_name(factory))
cur.execute("reset search_path")
with pytest.raises(psycopg.ProgrammingError):
cur.fetchone()
@pytest.mark.crdb_skip("no col query")
@pytest.mark.parametrize(
"factory", "tuple_row dict_row namedtuple_row args_row".split()
)
def test_no_column(factory, conn):
cur = conn.cursor(row_factory=factory_from_name(factory))
cur.execute("select")
recs = cur.fetchall()
assert len(recs) == 1
assert not recs[0]
@pytest.mark.crdb("skip")
def test_no_column_class_row(conn):
class Empty:
def __init__(self, x=10, y=20):
self.x = x
self.y = y
cur = conn.cursor(row_factory=rows.class_row(Empty))
cur.execute("select")
x = cur.fetchone()
assert isinstance(x, Empty)
assert x.x == 10
assert x.y == 20
def factory_from_name(name):
if (factory := getattr(rows, name)) is rows.class_row:
factory = factory(Person)
if factory is rows.args_row:
factory = factory(argf)
if factory is rows.kwargs_row:
factory = factory(argf)
return factory
class Person:
def __init__(self, first, last, age=None):
self.first = first
self.last = last
self.age = age
def argf(*args):
return "".join(map(str, args))
def kwargf(**kwargs):
return Person(**kwargs, age=42)
|