1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
|
import pytest
import psycopg
from psycopg import sql
from psycopg.pq import TransactionStatus
from psycopg.types import TypeInfo
from psycopg.types.enum import EnumInfo
from psycopg.types.range import RangeInfo
from psycopg.types.composite import CompositeInfo
from psycopg.types.multirange import MultirangeInfo
from .fix_crdb import crdb_encoding
@pytest.mark.parametrize("name", ["text", sql.Identifier("text")])
@pytest.mark.parametrize("status", ["IDLE", "INTRANS", None])
@pytest.mark.parametrize(
"encoding", ["utf8", crdb_encoding("latin1"), crdb_encoding("sql_ascii")]
)
def test_fetch(conn, name, status, encoding):
with conn.transaction():
conn.execute("select set_config('client_encoding', %s, false)", [encoding])
if status:
if (status := getattr(TransactionStatus, status)) == TransactionStatus.INTRANS:
conn.execute("select 1")
else:
conn.autocommit = True
status = TransactionStatus.IDLE
assert conn.info.transaction_status == status
info = TypeInfo.fetch(conn, name)
assert conn.info.transaction_status == status
assert info.name == "text"
# TODO: add the schema?
# assert info.schema == "pg_catalog"
assert info.oid == psycopg.adapters.types["text"].oid
assert info.array_oid == psycopg.adapters.types["text"].array_oid
assert info.regtype == "text"
@pytest.mark.parametrize("name", ["text", sql.Identifier("text")])
@pytest.mark.parametrize("status", ["IDLE", "INTRANS", None])
@pytest.mark.parametrize(
"encoding", ["utf8", crdb_encoding("latin1"), crdb_encoding("sql_ascii")]
)
async def test_fetch_async(aconn, name, status, encoding):
async with aconn.transaction():
await aconn.execute(
"select set_config('client_encoding', %s, false)", [encoding]
)
if status:
if (status := getattr(TransactionStatus, status)) == TransactionStatus.INTRANS:
await aconn.execute("select 1")
else:
await aconn.set_autocommit(True)
status = TransactionStatus.IDLE
assert aconn.info.transaction_status == status
info = await TypeInfo.fetch(aconn, name)
assert aconn.info.transaction_status == status
assert info.name == "text"
# assert info.schema == "pg_catalog"
assert info.oid == psycopg.adapters.types["text"].oid
assert info.array_oid == psycopg.adapters.types["text"].array_oid
_name = pytest.mark.parametrize("name", ["nosuch", sql.Identifier("nosuch")])
_status = pytest.mark.parametrize("status", ["IDLE", "INTRANS"])
_info_cls = pytest.mark.parametrize(
"info_cls",
[
pytest.param(TypeInfo),
pytest.param(RangeInfo, marks=pytest.mark.crdb_skip("range")),
pytest.param(
MultirangeInfo,
marks=(pytest.mark.crdb_skip("range"), pytest.mark.pg(">= 14")),
),
pytest.param(CompositeInfo, marks=pytest.mark.crdb_skip("composite")),
pytest.param(EnumInfo),
],
)
@_name
@_status
@_info_cls
def test_fetch_not_found(conn, name, status, info_cls, monkeypatch):
if TypeInfo._has_to_regtype_function(conn):
exit_orig = psycopg.Transaction.__exit__
def exit(self, exc_type, exc_val, exc_tb):
assert exc_val is None
return exit_orig(self, exc_type, exc_val, exc_tb)
monkeypatch.setattr(psycopg.Transaction, "__exit__", exit)
if (status := getattr(TransactionStatus, status)) == TransactionStatus.INTRANS:
conn.execute("select 1")
assert conn.info.transaction_status == status
info = info_cls.fetch(conn, name)
assert conn.info.transaction_status == status
assert info is None
@_name
@_status
@_info_cls
async def test_fetch_not_found_async(aconn, name, status, info_cls, monkeypatch):
if TypeInfo._has_to_regtype_function(aconn):
exit_orig = psycopg.AsyncTransaction.__aexit__
async def aexit(self, exc_type, exc_val, exc_tb):
assert exc_val is None
return await exit_orig(self, exc_type, exc_val, exc_tb)
monkeypatch.setattr(psycopg.AsyncTransaction, "__aexit__", aexit)
if (status := getattr(TransactionStatus, status)) == TransactionStatus.INTRANS:
await aconn.execute("select 1")
assert aconn.info.transaction_status == status
info = await info_cls.fetch(aconn, name)
assert aconn.info.transaction_status == status
assert info is None
@pytest.mark.crdb_skip("composite")
@pytest.mark.parametrize(
"name", ["testschema.testtype", sql.Identifier("testschema", "testtype")]
)
def test_fetch_by_schema_qualified_string(conn, name):
conn.execute("create schema if not exists testschema")
conn.execute("create type testschema.testtype as (foo text)")
info = TypeInfo.fetch(conn, name)
assert info.name == "testtype"
# assert info.schema == "testschema"
cur = conn.execute(
"""
select oid, typarray from pg_type
where oid = 'testschema.testtype'::regtype
"""
)
assert cur.fetchone() == (info.oid, info.array_oid)
@pytest.mark.parametrize(
"name",
[
"text",
# TODO: support these?
# "pg_catalog.text",
# sql.Identifier("text"),
# sql.Identifier("pg_catalog", "text"),
],
)
def test_registry_by_builtin_name(conn, name):
info = psycopg.adapters.types[name]
assert info.name == "text"
assert info.oid == 25
def test_registry_empty():
r = psycopg.types.TypesRegistry()
assert r.get("text") is None
with pytest.raises(KeyError):
r["text"]
@pytest.mark.parametrize("oid, aoid", [(1, 2), (1, 0), (0, 2), (0, 0)])
def test_registry_invalid_oid(oid, aoid):
r = psycopg.types.TypesRegistry()
ti = psycopg.types.TypeInfo("test", oid, aoid)
r.add(ti)
assert r["test"] is ti
if oid:
assert r[oid] is ti
if aoid:
assert r[aoid] is ti
with pytest.raises(KeyError):
r[0]
def test_registry_copy():
r = psycopg.types.TypesRegistry(psycopg.postgres.types)
assert r.get("text") is r["text"] is r[25]
assert r["text"].oid == 25
def test_registry_isolated():
orig = psycopg.postgres.types
tinfo = orig["text"]
r = psycopg.types.TypesRegistry(orig)
tdummy = psycopg.types.TypeInfo("dummy", tinfo.oid, tinfo.array_oid)
r.add(tdummy)
assert r[25] is r["dummy"] is tdummy
assert orig[25] is r["text"] is tinfo
|