File: test_query.py

package info (click to toggle)
pg8000 1.31.5-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 836 kB
  • sloc: python: 8,273; sh: 25; makefile: 9
file content (241 lines) | stat: -rw-r--r-- 6,472 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
import pytest

from pg8000.native import DatabaseError, to_statement


# Tests relating to the basic operation of the database driver, driven by the
# pg8000 custom interface.


@pytest.fixture
def db_table(request, con):
    con.run(
        "CREATE TEMPORARY TABLE t1 (f1 int primary key, "
        "f2 bigint not null, f3 varchar(50) null) "
    )

    def fin():
        try:
            con.run("drop table t1")
        except DatabaseError:
            pass

    request.addfinalizer(fin)
    return con


def test_database_error(con):
    with pytest.raises(DatabaseError):
        con.run("INSERT INTO t99 VALUES (1, 2, 3)")


# Run a query on a table, alter the structure of the table, then run the
# original query again.


def test_alter(db_table):
    db_table.run("select * from t1")
    db_table.run("alter table t1 drop column f3")
    db_table.run("select * from t1")


# Run a query on a table, drop then re-create the table, then run the
# original query again.


def test_create(db_table):
    db_table.run("select * from t1")
    db_table.run("drop table t1")
    db_table.run("create temporary table t1 (f1 int primary key)")
    db_table.run("select * from t1")


def test_parametrized(db_table):
    res = db_table.run("SELECT f1, f2, f3 FROM t1 WHERE f1 > :f1", f1=3)
    for row in res:
        f1, f2, f3 = row


def test_insert_returning(db_table):
    db_table.run("CREATE TEMPORARY TABLE t2 (id serial, data text)")

    # Test INSERT ... RETURNING with one row...
    res = db_table.run("INSERT INTO t2 (data) VALUES (:v) RETURNING id", v="test1")
    row_id = res[0][0]
    res = db_table.run("SELECT data FROM t2 WHERE id = :v", v=row_id)
    assert "test1" == res[0][0]

    assert db_table.row_count == 1

    # Test with multiple rows...
    res = db_table.run(
        "INSERT INTO t2 (data) VALUES (:v1), (:v2), (:v3) " "RETURNING id",
        v1="test2",
        v2="test3",
        v3="test4",
    )
    assert db_table.row_count == 3
    ids = [x[0] for x in res]
    assert len(ids) == 3


def test_row_count_select(db_table):
    expected_count = 57
    for i in range(expected_count):
        db_table.run(
            "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=i, v2=i, v3=None
        )

    db_table.run("SELECT * FROM t1")

    # Check row_count
    assert expected_count == db_table.row_count

    # Should be -1 for a command with no results
    db_table.run("DROP TABLE t1")
    assert -1 == db_table.row_count


def test_row_count_delete(db_table):
    db_table.run(
        "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=1, v2=1, v3=None
    )
    db_table.run("DELETE FROM t1")
    assert db_table.row_count == 1


def test_row_count_update(db_table):
    db_table.run(
        "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=1, v2=1, v3=None
    )
    db_table.run(
        "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=2, v2=10, v3=None
    )
    db_table.run(
        "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=3, v2=100, v3=None
    )
    db_table.run(
        "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=4, v2=1000, v3=None
    )
    db_table.run(
        "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=5, v2=10000, v3=None
    )
    db_table.run("UPDATE t1 SET f3 = :v1 WHERE f2 > 101", v1="Hello!")
    assert db_table.row_count == 2


def test_int_oid(con):
    # https://bugs.launchpad.net/pg8000/+bug/230796
    con.run("SELECT typname FROM pg_type WHERE oid = :v", v=100)


def test_unicode_query(con):
    con.run(
        "CREATE TEMPORARY TABLE \u043c\u0435\u0441\u0442\u043e "
        "(\u0438\u043c\u044f VARCHAR(50), "
        "\u0430\u0434\u0440\u0435\u0441 VARCHAR(250))"
    )


def test_transactions(db_table):
    db_table.run("start transaction")
    db_table.run(
        "INSERT INTO t1 (f1, f2, f3) VALUES (:v1, :v2, :v3)", v1=1, v2=1, v3="Zombie"
    )
    db_table.run("rollback")
    db_table.run("select * from t1")

    assert db_table.row_count == 0


def test_in(con):
    ret = con.run("SELECT typname FROM pg_type WHERE oid = any(:v)", v=[16, 23])
    assert ret[0][0] == "bool"


def test_empty_query(con):
    """No exception thrown"""
    con.run("")


def test_rollback_no_transaction(con):
    # Remove any existing notices
    con.notices.clear()

    # First, verify that a raw rollback does produce a notice
    con.run("rollback")

    assert 1 == len(con.notices)

    # 25P01 is the code for no_active_sql_tronsaction. It has
    # a message and severity name, but those might be
    # localized/depend on the server version.
    assert con.notices.pop().get(b"C") == b"25P01"


def test_close_prepared_statement(con):
    ps = con.prepare("select 1")
    ps.run()
    res = con.run("select count(*) from pg_prepared_statements")
    assert res[0][0] == 1  # Should have one prepared statement

    ps.close()

    res = con.run("select count(*) from pg_prepared_statements")
    assert res[0][0] == 0  # Should have no prepared statements


def test_no_data(con):
    assert con.run("START TRANSACTION") is None


def test_multiple_statements(con):
    statements = "SELECT 5; SELECT 'Erich Fromm';"
    assert con.run(statements) == [[5], ["Erich Fromm"]]


def test_unexecuted_connection_row_count(con):
    assert con.row_count is None


def test_unexecuted_connection_columns(con):
    assert con.columns is None


def test_sql_prepared_statement(con):
    con.run("PREPARE gen_series AS SELECT generate_series(1, 10);")
    con.run("EXECUTE gen_series")


def test_to_statement():
    new_query, _ = to_statement(
        "SELECT sum(x)::decimal(5, 2) :f_2, :f1 FROM t WHERE a=:f_2"
    )
    expected = "SELECT sum(x)::decimal(5, 2) $1, $2 FROM t WHERE a=$1"
    assert new_query == expected


def test_to_statement_quotes():
    new_query, _ = to_statement("SELECT $$'$$ = :v")
    expected = "SELECT $$'$$ = $1"
    assert new_query == expected


def test_not_parsed_if_no_params(mocker, con):
    mock_to_statement = mocker.patch("pg8000.native.to_statement")
    con.run("ROLLBACK")
    mock_to_statement.assert_not_called()


def test_max_parameters(con):
    SIZE = 60000
    kwargs = {f"param_{i}": 1 for i in range(SIZE)}
    con.run(
        f"SELECT 1 WHERE 1 IN ({','.join([f':param_{i}' for i in range(SIZE)])})",
        **kwargs,
    )


def test_pg_placeholder_style(con):
    rows = con.run("SELECT $1", title="A Time Of Hope")
    assert rows[0] == ["A Time Of Hope"]