File: test_typeinfo.py

package info (click to toggle)
psycopg3 3.3.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,836 kB
  • sloc: python: 46,657; sh: 403; ansic: 149; makefile: 73
file content (204 lines) | stat: -rw-r--r-- 6,412 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import pytest

import psycopg
from psycopg import sql
from psycopg.pq import TransactionStatus
from psycopg.types import TypeInfo
from psycopg.types.enum import EnumInfo
from psycopg.types.range import RangeInfo
from psycopg.types.composite import CompositeInfo
from psycopg.types.multirange import MultirangeInfo

from .fix_crdb import crdb_encoding


@pytest.mark.parametrize("name", ["text", sql.Identifier("text")])
@pytest.mark.parametrize("status", ["IDLE", "INTRANS", None])
@pytest.mark.parametrize(
    "encoding", ["utf8", crdb_encoding("latin1"), crdb_encoding("sql_ascii")]
)
def test_fetch(conn, name, status, encoding):
    with conn.transaction():
        conn.execute("select set_config('client_encoding', %s, false)", [encoding])

    if status:
        if (status := getattr(TransactionStatus, status)) == TransactionStatus.INTRANS:
            conn.execute("select 1")
    else:
        conn.autocommit = True
        status = TransactionStatus.IDLE

    assert conn.info.transaction_status == status
    info = TypeInfo.fetch(conn, name)
    assert conn.info.transaction_status == status

    assert info.name == "text"
    # TODO: add the schema?
    # assert info.schema == "pg_catalog"

    assert info.oid == psycopg.adapters.types["text"].oid
    assert info.array_oid == psycopg.adapters.types["text"].array_oid
    assert info.regtype == "text"


@pytest.mark.parametrize("name", ["text", sql.Identifier("text")])
@pytest.mark.parametrize("status", ["IDLE", "INTRANS", None])
@pytest.mark.parametrize(
    "encoding", ["utf8", crdb_encoding("latin1"), crdb_encoding("sql_ascii")]
)
async def test_fetch_async(aconn, name, status, encoding):
    async with aconn.transaction():
        await aconn.execute(
            "select set_config('client_encoding', %s, false)", [encoding]
        )

    if status:
        if (status := getattr(TransactionStatus, status)) == TransactionStatus.INTRANS:
            await aconn.execute("select 1")
    else:
        await aconn.set_autocommit(True)
        status = TransactionStatus.IDLE

    assert aconn.info.transaction_status == status
    info = await TypeInfo.fetch(aconn, name)
    assert aconn.info.transaction_status == status

    assert info.name == "text"
    # assert info.schema == "pg_catalog"
    assert info.oid == psycopg.adapters.types["text"].oid
    assert info.array_oid == psycopg.adapters.types["text"].array_oid


_name = pytest.mark.parametrize("name", ["nosuch", sql.Identifier("nosuch")])
_status = pytest.mark.parametrize("status", ["IDLE", "INTRANS"])
_info_cls = pytest.mark.parametrize(
    "info_cls",
    [
        pytest.param(TypeInfo),
        pytest.param(RangeInfo, marks=pytest.mark.crdb_skip("range")),
        pytest.param(
            MultirangeInfo,
            marks=(pytest.mark.crdb_skip("range"), pytest.mark.pg(">= 14")),
        ),
        pytest.param(CompositeInfo, marks=pytest.mark.crdb_skip("composite")),
        pytest.param(EnumInfo),
    ],
)


@_name
@_status
@_info_cls
def test_fetch_not_found(conn, name, status, info_cls, monkeypatch):
    if TypeInfo._has_to_regtype_function(conn):
        exit_orig = psycopg.Transaction.__exit__

        def exit(self, exc_type, exc_val, exc_tb):
            assert exc_val is None
            return exit_orig(self, exc_type, exc_val, exc_tb)

        monkeypatch.setattr(psycopg.Transaction, "__exit__", exit)

    if (status := getattr(TransactionStatus, status)) == TransactionStatus.INTRANS:
        conn.execute("select 1")

    assert conn.info.transaction_status == status
    info = info_cls.fetch(conn, name)
    assert conn.info.transaction_status == status
    assert info is None


@_name
@_status
@_info_cls
async def test_fetch_not_found_async(aconn, name, status, info_cls, monkeypatch):
    if TypeInfo._has_to_regtype_function(aconn):
        exit_orig = psycopg.AsyncTransaction.__aexit__

        async def aexit(self, exc_type, exc_val, exc_tb):
            assert exc_val is None
            return await exit_orig(self, exc_type, exc_val, exc_tb)

        monkeypatch.setattr(psycopg.AsyncTransaction, "__aexit__", aexit)

    if (status := getattr(TransactionStatus, status)) == TransactionStatus.INTRANS:
        await aconn.execute("select 1")

    assert aconn.info.transaction_status == status
    info = await info_cls.fetch(aconn, name)
    assert aconn.info.transaction_status == status

    assert info is None


@pytest.mark.crdb_skip("composite")
@pytest.mark.parametrize(
    "name", ["testschema.testtype", sql.Identifier("testschema", "testtype")]
)
def test_fetch_by_schema_qualified_string(conn, name):
    conn.execute("create schema if not exists testschema")
    conn.execute("create type testschema.testtype as (foo text)")

    info = TypeInfo.fetch(conn, name)
    assert info.name == "testtype"
    # assert info.schema == "testschema"
    cur = conn.execute(
        """
        select oid, typarray from pg_type
        where oid = 'testschema.testtype'::regtype
        """
    )
    assert cur.fetchone() == (info.oid, info.array_oid)


@pytest.mark.parametrize(
    "name",
    [
        "text",
        # TODO: support these?
        # "pg_catalog.text",
        # sql.Identifier("text"),
        # sql.Identifier("pg_catalog", "text"),
    ],
)
def test_registry_by_builtin_name(conn, name):
    info = psycopg.adapters.types[name]
    assert info.name == "text"
    assert info.oid == 25


def test_registry_empty():
    r = psycopg.types.TypesRegistry()
    assert r.get("text") is None
    with pytest.raises(KeyError):
        r["text"]


@pytest.mark.parametrize("oid, aoid", [(1, 2), (1, 0), (0, 2), (0, 0)])
def test_registry_invalid_oid(oid, aoid):
    r = psycopg.types.TypesRegistry()
    ti = psycopg.types.TypeInfo("test", oid, aoid)
    r.add(ti)
    assert r["test"] is ti
    if oid:
        assert r[oid] is ti
    if aoid:
        assert r[aoid] is ti
    with pytest.raises(KeyError):
        r[0]


def test_registry_copy():
    r = psycopg.types.TypesRegistry(psycopg.postgres.types)
    assert r.get("text") is r["text"] is r[25]
    assert r["text"].oid == 25


def test_registry_isolated():
    orig = psycopg.postgres.types
    tinfo = orig["text"]
    r = psycopg.types.TypesRegistry(orig)
    tdummy = psycopg.types.TypeInfo("dummy", tinfo.oid, tinfo.array_oid)
    r.add(tdummy)
    assert r[25] is r["dummy"] is tdummy
    assert orig[25] is r["text"] is tinfo