File: test_sqlite3.py

package info (click to toggle)
pypy 7.0.0%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 107,216 kB
  • sloc: python: 1,201,787; ansic: 62,419; asm: 5,169; cpp: 3,017; sh: 2,534; makefile: 545; xml: 243; lisp: 45; awk: 4
file content (324 lines) | stat: -rw-r--r-- 10,332 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# -*- coding: utf-8 -*-
"""Tests for _sqlite3.py"""

from __future__ import absolute_import
import pytest
import sys

_sqlite3 = pytest.importorskip('_sqlite3')

pypy_only = pytest.mark.skipif('__pypy__' not in sys.builtin_module_names,
    reason="PyPy-only test")


@pytest.yield_fixture
def con():
    con = _sqlite3.connect(':memory:')
    yield con
    con.close()


def test_list_ddl(con):
    """From issue996.  Mostly just looking for lack of exceptions."""
    cursor = con.cursor()
    cursor.execute('CREATE TABLE foo (bar INTEGER)')
    result = list(cursor)
    assert result == []
    cursor.execute('INSERT INTO foo (bar) VALUES (42)')
    result = list(cursor)
    assert result == []
    cursor.execute('SELECT * FROM foo')
    result = list(cursor)
    assert result == [(42,)]

@pypy_only
def test_connect_takes_same_positional_args_as_Connection(con):
    from inspect import getargspec
    clsargs = getargspec(_sqlite3.Connection.__init__).args[1:]  # ignore self
    conargs = getargspec(_sqlite3.connect).args
    assert clsargs == conargs

def test_total_changes_after_close(con):
    con.close()
    with pytest.raises(_sqlite3.ProgrammingError):
        con.total_changes

def test_connection_check_init():
    class Connection(_sqlite3.Connection):
        def __init__(self, name):
            pass

    con = Connection(":memory:")
    with pytest.raises(_sqlite3.ProgrammingError) as excinfo:
        con.cursor()
    assert '__init__' in str(excinfo.value)


def test_cursor_check_init(con):
    class Cursor(_sqlite3.Cursor):
        def __init__(self, name):
            pass

    cur = Cursor(con)
    with pytest.raises(_sqlite3.ProgrammingError) as excinfo:
        cur.execute('select 1')
    assert '__init__' in str(excinfo.value)


def test_connection_after_close(con):
    with pytest.raises(TypeError):
        con()
    con.close()
    # raises ProgrammingError because should check closed before check args
    with pytest.raises(_sqlite3.ProgrammingError):
        con()

def test_cursor_iter(con):
    cur = con.cursor()
    with pytest.raises(StopIteration):
        next(cur)

    cur.execute('select 1')
    next(cur)
    with pytest.raises(StopIteration):
        next(cur)

    cur.execute('select 1')
    con.commit()
    next(cur)
    with pytest.raises(StopIteration):
        next(cur)

    with pytest.raises(_sqlite3.ProgrammingError):
        cur.executemany('select 1', [])
    with pytest.raises(StopIteration):
        next(cur)

    cur.execute('select 1')
    cur.execute('create table test(ing)')
    with pytest.raises(StopIteration):
        next(cur)

    cur.execute('select 1')
    cur.execute('insert into test values(1)')
    con.commit()
    with pytest.raises(StopIteration):
        next(cur)

def test_cursor_after_close(con):
    cur = con.execute('select 1')
    cur.close()
    con.close()
    with pytest.raises(_sqlite3.ProgrammingError):
        cur.close()
    # raises ProgrammingError because should check closed before check args
    with pytest.raises(_sqlite3.ProgrammingError):
        cur.execute(1,2,3,4,5)
    with pytest.raises(_sqlite3.ProgrammingError):
        cur.executemany(1,2,3,4,5)

def test_connection_del(tmpdir):
    """For issue1325."""
    import os
    import gc
    resource = pytest.importorskip('resource')

    limit = resource.getrlimit(resource.RLIMIT_NOFILE)
    try:
        fds = 0
        while True:
            fds += 1
            resource.setrlimit(resource.RLIMIT_NOFILE, (fds, limit[1]))
            try:
                for p in os.pipe(): os.close(p)
            except OSError:
                assert fds < 100
            else:
                break

        def open_many(cleanup):
            con = []
            for i in range(3):
                con.append(_sqlite3.connect(str(tmpdir.join('test.db'))))
                if cleanup:
                    con[i] = None
                    gc.collect(); gc.collect()

        with pytest.raises(_sqlite3.OperationalError):
            open_many(False)
        gc.collect(); gc.collect()
        open_many(True)
    finally:
        resource.setrlimit(resource.RLIMIT_NOFILE, limit)

def test_on_conflict_rollback_executemany(con):
    major, minor, micro = _sqlite3.sqlite_version.split('.')[:3]
    if (int(major), int(minor), int(micro)) < (3, 2, 2):
        pytest.skip("requires sqlite3 version >= 3.2.2")
    con.execute("create table foo(x, unique(x) on conflict rollback)")
    con.execute("insert into foo(x) values (1)")
    try:
        con.executemany("insert into foo(x) values (?)", [[1]])
    except _sqlite3.DatabaseError:
        pass
    con.execute("insert into foo(x) values (2)")
    try:
        con.commit()
    except _sqlite3.OperationalError:
        pytest.fail("_sqlite3 knew nothing about the implicit ROLLBACK")

def test_statement_arg_checking(con):
    with pytest.raises(_sqlite3.Warning) as e:
        con(123)
    assert str(e.value).startswith('SQL is of wrong type. Must be string')
    with pytest.raises(ValueError) as e:
        con.execute(123)
    assert str(e.value).startswith('operation parameter must be str')
    with pytest.raises(ValueError) as e:
        con.executemany(123, 123)
    assert str(e.value).startswith('operation parameter must be str')
    with pytest.raises(ValueError) as e:
        con.executescript(123)
    assert str(e.value).startswith('script argument must be unicode')

def test_statement_param_checking(con):
    con.execute('create table foo(x)')
    con.execute('insert into foo(x) values (?)', [2])
    con.execute('insert into foo(x) values (?)', (2,))
    class seq(object):
        def __len__(self):
            return 1
        def __getitem__(self, key):
            return 2
    con.execute('insert into foo(x) values (?)', seq())
    del seq.__len__
    with pytest.raises(_sqlite3.ProgrammingError):
        con.execute('insert into foo(x) values (?)', seq())
    with pytest.raises(_sqlite3.ProgrammingError):
        con.execute('insert into foo(x) values (?)', {2:2})
    with pytest.raises(ValueError) as e:
        con.execute('insert into foo(x) values (?)', 2)
    assert str(e.value) == 'parameters are of unsupported type'

def test_explicit_begin(con):
    con.execute('BEGIN')
    con.execute('BEGIN ')
    con.execute('BEGIN')
    con.commit()
    con.execute('BEGIN')
    con.commit()

def test_row_factory_use(con):
    con.row_factory = 42
    con.execute('select 1')

def test_returning_blob_must_own_memory(con):
    import gc
    con.create_function("returnblob", 0, lambda: buffer("blob"))
    cur = con.execute("select returnblob()")
    val = cur.fetchone()[0]
    for i in range(5):
        gc.collect()
        got = (val[0], val[1], val[2], val[3])
        assert got == ('b', 'l', 'o', 'b')
    # in theory 'val' should be a read-write buffer
    # but it's not right now
    if not hasattr(_sqlite3, '_ffi'):
        val[1] = 'X'
        got = (val[0], val[1], val[2], val[3])
        assert got == ('b', 'X', 'o', 'b')

def test_description_after_fetchall(con):
    cur = con.cursor()
    assert cur.description is None
    cur.execute("select 42").fetchall()
    assert cur.description is not None

def test_executemany_lastrowid(con):
    cur = con.cursor()
    cur.execute("create table test(a)")
    cur.executemany("insert into test values (?)", [[1], [2], [3]])
    assert cur.lastrowid is None
    # issue 2682
    cur.execute('''insert
                into test
                values (?)
                ''', (1, ))
    assert cur.lastrowid is not None
    cur.execute('''insert\t into test values (?) ''', (1, ))
    assert cur.lastrowid is not None

def test_authorizer_bad_value(con):
    def authorizer_cb(action, arg1, arg2, dbname, source):
        return 42
    con.set_authorizer(authorizer_cb)
    with pytest.raises(_sqlite3.OperationalError) as e:
        con.execute('select 123')
    major, minor, micro = _sqlite3.sqlite_version.split('.')[:3]
    if (int(major), int(minor), int(micro)) >= (3, 6, 14):
        assert str(e.value) == 'authorizer malfunction'
    else:
        assert str(e.value) == \
            ("illegal return value (1) from the authorization function - "
                "should be SQLITE_OK, SQLITE_IGNORE, or SQLITE_DENY")

def test_issue1573(con):
    cur = con.cursor()
    cur.execute(u'SELECT 1 as méil')
    assert cur.description[0][0] == u"méil".encode('utf-8')

def test_adapter_exception(con):
    def cast(obj):
        raise ZeroDivisionError

    _sqlite3.register_adapter(int, cast)
    try:
        cur = con.cursor()
        cur.execute("select ?", (4,))
        val = cur.fetchone()[0]
        # Adapter error is ignored, and parameter is passed as is.
        assert val == 4
        assert type(val) is int
    finally:
        del _sqlite3.adapters[(int, _sqlite3.PrepareProtocol)]

def test_null_character(con):
    if not hasattr(_sqlite3, '_ffi') and sys.version_info < (2, 7, 9):
        pytest.skip("_sqlite3 too old")
    with pytest.raises(ValueError) as excinfo:
        con("\0select 1")
    assert str(excinfo.value) == "the query contains a null character"
    with pytest.raises(ValueError) as excinfo:
        con("select 1\0")
    assert str(excinfo.value) == "the query contains a null character"
    cur = con.cursor()
    with pytest.raises(ValueError) as excinfo:
        cur.execute("\0select 2")
    assert str(excinfo.value) == "the query contains a null character"
    with pytest.raises(ValueError) as excinfo:
        cur.execute("select 2\0")
    assert str(excinfo.value) == "the query contains a null character"

def test_close_in_del_ordering():
    import gc
    class SQLiteBackend(object):
        success = False
        def __init__(self):
            self.connection = _sqlite3.connect(":memory:")
        def close(self):
            self.connection.close()
        def __del__(self):
            self.close()
            SQLiteBackend.success = True
        def create_db_if_needed(self):
            conn = self.connection
            cursor = conn.cursor()
            cursor.execute("""
                create table if not exists nameoftable(value text)
            """)
            cursor.close()
            conn.commit()
    SQLiteBackend().create_db_if_needed()
    gc.collect()
    gc.collect()
    assert SQLiteBackend.success