
|
# -*- coding: utf-8 -*-
"""Tests for _sqlite3.py"""
from __future__ import absolute_import
import pytest
import sys
_sqlite3 = pytest.importorskip('_sqlite3')
pypy_only = pytest.mark.skipif('__pypy__' not in sys.builtin_module_names,
reason="PyPy-only test")
@pytest.yield_fixture
def con():
con = _sqlite3.connect(':memory:')
yield con
con.close()
def test_list_ddl(con):
"""From issue996. Mostly just looking for lack of exceptions."""
cursor = con.cursor()
cursor.execute('CREATE TABLE foo (bar INTEGER)')
result = list(cursor)
assert result == []
cursor.execute('INSERT INTO foo (bar) VALUES (42)')
result = list(cursor)
assert result == []
cursor.execute('SELECT * FROM foo')
result = list(cursor)
assert result == [(42,)]
@pypy_only
def test_connect_takes_same_positional_args_as_Connection(con):
from inspect import getargspec
clsargs = getargspec(_sqlite3.Connection.__init__).args[1:] # ignore self
conargs = getargspec(_sqlite3.connect).args
assert clsargs == conargs
def test_total_changes_after_close(con):
con.close()
with pytest.raises(_sqlite3.ProgrammingError):
con.total_changes
def test_connection_check_init():
class Connection(_sqlite3.Connection):
def __init__(self, name):
pass
con = Connection(":memory:")
with pytest.raises(_sqlite3.ProgrammingError) as excinfo:
con.cursor()
assert '__init__' in str(excinfo.value)
def test_cursor_check_init(con):
class Cursor(_sqlite3.Cursor):
def __init__(self, name):
pass
cur = Cursor(con)
with pytest.raises(_sqlite3.ProgrammingError) as excinfo:
cur.execute('select 1')
assert '__init__' in str(excinfo.value)
def test_connection_after_close(con):
with pytest.raises(TypeError):
con()
con.close()
# raises ProgrammingError because should check closed before check args
with pytest.raises(_sqlite3.ProgrammingError):
con()
def test_cursor_iter(con):
cur = con.cursor()
with pytest.raises(StopIteration):
next(cur)
cur.execute('select 1')
next(cur)
with pytest.raises(StopIteration):
next(cur)
cur.execute('select 1')
con.commit()
next(cur)
with pytest.raises(StopIteration):
next(cur)
with pytest.raises(_sqlite3.ProgrammingError):
cur.executemany('select 1', [])
with pytest.raises(StopIteration):
next(cur)
cur.execute('select 1')
cur.execute('create table test(ing)')
with pytest.raises(StopIteration):
next(cur)
cur.execute('select 1')
cur.execute('insert into test values(1)')
con.commit()
with pytest.raises(StopIteration):
next(cur)
def test_cursor_after_close(con):
cur = con.execute('select 1')
cur.close()
con.close()
with pytest.raises(_sqlite3.ProgrammingError):
cur.close()
# raises ProgrammingError because should check closed before check args
with pytest.raises(_sqlite3.ProgrammingError):
cur.execute(1,2,3,4,5)
with pytest.raises(_sqlite3.ProgrammingError):
cur.executemany(1,2,3,4,5)
def test_connection_del(tmpdir):
"""For issue1325."""
import os
import gc
resource = pytest.importorskip('resource')
limit = resource.getrlimit(resource.RLIMIT_NOFILE)
try:
fds = 0
while True:
fds += 1
resource.setrlimit(resource.RLIMIT_NOFILE, (fds, limit[1]))
try:
for p in os.pipe(): os.close(p)
except OSError:
assert fds < 100
else:
break
def open_many(cleanup):
con = []
for i in range(3):
con.append(_sqlite3.connect(str(tmpdir.join('test.db'))))
if cleanup:
con[i] = None
gc.collect(); gc.collect()
with pytest.raises(_sqlite3.OperationalError):
open_many(False)
gc.collect(); gc.collect()
open_many(True)
finally:
resource.setrlimit(resource.RLIMIT_NOFILE, limit)
def test_on_conflict_rollback_executemany(con):
major, minor, micro = _sqlite3.sqlite_version.split('.')[:3]
if (int(major), int(minor), int(micro)) < (3, 2, 2):
pytest.skip("requires sqlite3 version >= 3.2.2")
con.execute("create table foo(x, unique(x) on conflict rollback)")
con.execute("insert into foo(x) values (1)")
try:
con.executemany("insert into foo(x) values (?)", [[1]])
except _sqlite3.DatabaseError:
pass
con.execute("insert into foo(x) values (2)")
try:
con.commit()
except _sqlite3.OperationalError:
pytest.fail("_sqlite3 knew nothing about the implicit ROLLBACK")
def test_statement_arg_checking(con):
with pytest.raises(_sqlite3.Warning) as e:
con(123)
assert str(e.value).startswith('SQL is of wrong type. Must be string')
with pytest.raises(ValueError) as e:
con.execute(123)
assert str(e.value).startswith('operation parameter must be str')
with pytest.raises(ValueError) as e:
con.executemany(123, 123)
assert str(e.value).startswith('operation parameter must be str')
with pytest.raises(ValueError) as e:
con.executescript(123)
assert str(e.value).startswith('script argument must be unicode')
def test_statement_param_checking(con):
con.execute('create table foo(x)')
con.execute('insert into foo(x) values (?)', [2])
con.execute('insert into foo(x) values (?)', (2,))
class seq(object):
def __len__(self):
return 1
def __getitem__(self, key):
return 2
con.execute('insert into foo(x) values (?)', seq())
del seq.__len__
with pytest.raises(_sqlite3.ProgrammingError):
con.execute('insert into foo(x) values (?)', seq())
with pytest.raises(_sqlite3.ProgrammingError):
con.execute('insert into foo(x) values (?)', {2:2})
with pytest.raises(ValueError) as e:
con.execute('insert into foo(x) values (?)', 2)
assert str(e.value) == 'parameters are of unsupported type'
def test_explicit_begin(con):
con.execute('BEGIN')
con.execute('BEGIN ')
con.execute('BEGIN')
con.commit()
con.execute('BEGIN')
con.commit()
def test_row_factory_use(con):
con.row_factory = 42
con.execute('select 1')
def test_returning_blob_must_own_memory(con):
import gc
con.create_function("returnblob", 0, lambda: buffer("blob"))
cur = con.execute("select returnblob()")
val = cur.fetchone()[0]
for i in range(5):
gc.collect()
got = (val[0], val[1], val[2], val[3])
assert got == ('b', 'l', 'o', 'b')
# in theory 'val' should be a read-write buffer
# but it's not right now
if not hasattr(_sqlite3, '_ffi'):
val[1] = 'X'
got = (val[0], val[1], val[2], val[3])
assert got == ('b', 'X', 'o', 'b')
def test_description_after_fetchall(con):
cur = con.cursor()
assert cur.description is None
cur.execute("select 42").fetchall()
assert cur.description is not None
def test_executemany_lastrowid(con):
cur = con.cursor()
cur.execute("create table test(a)")
cur.executemany("insert into test values (?)", [[1], [2], [3]])
assert cur.lastrowid is None
# issue 2682
cur.execute('''insert
into test
values (?)
''', (1, ))
assert cur.lastrowid is not None
cur.execute('''insert\t into test values (?) ''', (1, ))
assert cur.lastrowid is not None
def test_authorizer_bad_value(con):
def authorizer_cb(action, arg1, arg2, dbname, source):
return 42
con.set_authorizer(authorizer_cb)
with pytest.raises(_sqlite3.OperationalError) as e:
con.execute('select 123')
major, minor, micro = _sqlite3.sqlite_version.split('.')[:3]
if (int(major), int(minor), int(micro)) >= (3, 6, 14):
assert str(e.value) == 'authorizer malfunction'
else:
assert str(e.value) == \
("illegal return value (1) from the authorization function - "
"should be SQLITE_OK, SQLITE_IGNORE, or SQLITE_DENY")
def test_issue1573(con):
cur = con.cursor()
cur.execute(u'SELECT 1 as méil')
assert cur.description[0][0] == u"méil".encode('utf-8')
def test_adapter_exception(con):
def cast(obj):
raise ZeroDivisionError
_sqlite3.register_adapter(int, cast)
try:
cur = con.cursor()
cur.execute("select ?", (4,))
val = cur.fetchone()[0]
# Adapter error is ignored, and parameter is passed as is.
assert val == 4
assert type(val) is int
finally:
del _sqlite3.adapters[(int, _sqlite3.PrepareProtocol)]
def test_null_character(con):
if not hasattr(_sqlite3, '_ffi') and sys.version_info < (2, 7, 9):
pytest.skip("_sqlite3 too old")
with pytest.raises(ValueError) as excinfo:
con("\0select 1")
assert str(excinfo.value) == "the query contains a null character"
with pytest.raises(ValueError) as excinfo:
con("select 1\0")
assert str(excinfo.value) == "the query contains a null character"
cur = con.cursor()
with pytest.raises(ValueError) as excinfo:
cur.execute("\0select 2")
assert str(excinfo.value) == "the query contains a null character"
with pytest.raises(ValueError) as excinfo:
cur.execute("select 2\0")
assert str(excinfo.value) == "the query contains a null character"
def test_close_in_del_ordering():
import gc
class SQLiteBackend(object):
success = False
def __init__(self):
self.connection = _sqlite3.connect(":memory:")
def close(self):
self.connection.close()
def __del__(self):
self.close()
SQLiteBackend.success = True
def create_db_if_needed(self):
conn = self.connection
cursor = conn.cursor()
cursor.execute("""
create table if not exists nameoftable(value text)
""")
cursor.close()
conn.commit()
SQLiteBackend().create_db_if_needed()
gc.collect()
gc.collect()
assert SQLiteBackend.success
|