1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
|
# -*- coding: utf-8 -*-
"""Tests for _sqlite3.py"""
from __future__ import absolute_import
import pytest
import sys
_sqlite3 = pytest.importorskip('_sqlite3')
pypy_only = pytest.mark.skipif('__pypy__' not in sys.builtin_module_names,
reason="PyPy-only test")
@pytest.yield_fixture
def con():
con = _sqlite3.connect(':memory:')
yield con
con.close()
def test_list_ddl(con):
"""From issue996. Mostly just looking for lack of exceptions."""
cursor = con.cursor()
cursor.execute('CREATE TABLE foo (bar INTEGER)')
result = list(cursor)
assert result == []
cursor.execute('INSERT INTO foo (bar) VALUES (42)')
result = list(cursor)
assert result == []
cursor.execute('SELECT * FROM foo')
result = list(cursor)
assert result == [(42,)]
@pypy_only
def test_connect_takes_same_positional_args_as_Connection(con):
from inspect import getargspec
clsargs = getargspec(_sqlite3.Connection.__init__).args[1:] # ignore self
conargs = getargspec(_sqlite3.connect).args
assert clsargs == conargs
def test_total_changes_after_close(con):
con.close()
with pytest.raises(_sqlite3.ProgrammingError):
con.total_changes
def test_connection_check_init():
class Connection(_sqlite3.Connection):
def __init__(self, name):
pass
con = Connection(":memory:")
with pytest.raises(_sqlite3.ProgrammingError) as excinfo:
con.cursor()
assert '__init__' in str(excinfo.value)
def test_cursor_check_init(con):
class Cursor(_sqlite3.Cursor):
def __init__(self, name):
pass
cur = Cursor(con)
with pytest.raises(_sqlite3.ProgrammingError) as excinfo:
cur.execute('select 1')
assert '__init__' in str(excinfo.value)
def test_connection_after_close(con):
with pytest.raises(TypeError):
con()
con.close()
# raises ProgrammingError because should check closed before check args
with pytest.raises(_sqlite3.ProgrammingError):
con()
def test_cursor_iter(con):
cur = con.cursor()
with pytest.raises(StopIteration):
next(cur)
cur.execute('select 1')
next(cur)
with pytest.raises(StopIteration):
next(cur)
cur.execute('select 1')
con.commit()
next(cur)
with pytest.raises(StopIteration):
next(cur)
with pytest.raises(_sqlite3.ProgrammingError):
cur.executemany('select 1', [])
with pytest.raises(StopIteration):
next(cur)
cur.execute('select 1')
cur.execute('create table test(ing)')
with pytest.raises(StopIteration):
next(cur)
cur.execute('select 1')
cur.execute('insert into test values(1)')
con.commit()
with pytest.raises(StopIteration):
next(cur)
def test_cursor_after_close(con):
cur = con.execute('select 1')
cur.close()
con.close()
with pytest.raises(_sqlite3.ProgrammingError):
cur.close()
# raises ProgrammingError because should check closed before check args
with pytest.raises(_sqlite3.ProgrammingError):
cur.execute(1,2,3,4,5)
with pytest.raises(_sqlite3.ProgrammingError):
cur.executemany(1,2,3,4,5)
def test_connection_del(tmpdir):
"""For issue1325."""
import os
import gc
resource = pytest.importorskip('resource')
limit = resource.getrlimit(resource.RLIMIT_NOFILE)
try:
fds = 0
while True:
fds += 1
resource.setrlimit(resource.RLIMIT_NOFILE, (fds, limit[1]))
try:
for p in os.pipe(): os.close(p)
except OSError:
assert fds < 100
else:
break
def open_many(cleanup):
con = []
for i in range(3):
con.append(_sqlite3.connect(str(tmpdir.join('test.db'))))
if cleanup:
con[i] = None
gc.collect(); gc.collect()
with pytest.raises(_sqlite3.OperationalError):
open_many(False)
gc.collect(); gc.collect()
open_many(True)
finally:
resource.setrlimit(resource.RLIMIT_NOFILE, limit)
def test_on_conflict_rollback_executemany(con):
major, minor, micro = _sqlite3.sqlite_version.split('.')[:3]
if (int(major), int(minor), int(micro)) < (3, 2, 2):
pytest.skip("requires sqlite3 version >= 3.2.2")
con.execute("create table foo(x, unique(x) on conflict rollback)")
con.execute("insert into foo(x) values (1)")
try:
con.executemany("insert into foo(x) values (?)", [[1]])
except _sqlite3.DatabaseError:
pass
con.execute("insert into foo(x) values (2)")
try:
con.commit()
except _sqlite3.OperationalError:
pytest.fail("_sqlite3 knew nothing about the implicit ROLLBACK")
def test_statement_arg_checking(con):
with pytest.raises(_sqlite3.Warning) as e:
con(123)
assert str(e.value).startswith('SQL is of wrong type. Must be string')
with pytest.raises(ValueError) as e:
con.execute(123)
assert str(e.value).startswith('operation parameter must be str')
with pytest.raises(ValueError) as e:
con.executemany(123, 123)
assert str(e.value).startswith('operation parameter must be str')
with pytest.raises(ValueError) as e:
con.executescript(123)
assert str(e.value).startswith('script argument must be unicode')
def test_statement_param_checking(con):
con.execute('create table foo(x)')
con.execute('insert into foo(x) values (?)', [2])
con.execute('insert into foo(x) values (?)', (2,))
class seq(object):
def __len__(self):
return 1
def __getitem__(self, key):
return 2
con.execute('insert into foo(x) values (?)', seq())
del seq.__len__
with pytest.raises(_sqlite3.ProgrammingError):
con.execute('insert into foo(x) values (?)', seq())
with pytest.raises(_sqlite3.ProgrammingError):
con.execute('insert into foo(x) values (?)', {2:2})
with pytest.raises(ValueError) as e:
con.execute('insert into foo(x) values (?)', 2)
assert str(e.value) == 'parameters are of unsupported type'
def test_explicit_begin(con):
con.execute('BEGIN')
con.execute('BEGIN ')
con.execute('BEGIN')
con.commit()
con.execute('BEGIN')
con.commit()
def test_row_factory_use(con):
con.row_factory = 42
con.execute('select 1')
def test_returning_blob_must_own_memory(con):
import gc
con.create_function("returnblob", 0, lambda: buffer("blob"))
cur = con.execute("select returnblob()")
val = cur.fetchone()[0]
for i in range(5):
gc.collect()
got = (val[0], val[1], val[2], val[3])
assert got == ('b', 'l', 'o', 'b')
# in theory 'val' should be a read-write buffer
# but it's not right now
if not hasattr(_sqlite3, '_ffi'):
val[1] = 'X'
got = (val[0], val[1], val[2], val[3])
assert got == ('b', 'X', 'o', 'b')
def test_description_after_fetchall(con):
cur = con.cursor()
assert cur.description is None
cur.execute("select 42").fetchall()
assert cur.description is not None
def test_executemany_lastrowid(con):
cur = con.cursor()
cur.execute("create table test(a)")
cur.executemany("insert into test values (?)", [[1], [2], [3]])
assert cur.lastrowid is None
# issue 2682
cur.execute('''insert
into test
values (?)
''', (1, ))
assert cur.lastrowid is not None
cur.execute('''insert\t into test values (?) ''', (1, ))
assert cur.lastrowid is not None
def test_authorizer_bad_value(con):
def authorizer_cb(action, arg1, arg2, dbname, source):
return 42
con.set_authorizer(authorizer_cb)
with pytest.raises(_sqlite3.OperationalError) as e:
con.execute('select 123')
major, minor, micro = _sqlite3.sqlite_version.split('.')[:3]
if (int(major), int(minor), int(micro)) >= (3, 6, 14):
assert str(e.value) == 'authorizer malfunction'
else:
assert str(e.value) == \
("illegal return value (1) from the authorization function - "
"should be SQLITE_OK, SQLITE_IGNORE, or SQLITE_DENY")
def test_issue1573(con):
cur = con.cursor()
cur.execute(u'SELECT 1 as méil')
assert cur.description[0][0] == u"méil".encode('utf-8')
def test_adapter_exception(con):
def cast(obj):
raise ZeroDivisionError
_sqlite3.register_adapter(int, cast)
try:
cur = con.cursor()
cur.execute("select ?", (4,))
val = cur.fetchone()[0]
# Adapter error is ignored, and parameter is passed as is.
assert val == 4
assert type(val) is int
finally:
del _sqlite3.adapters[(int, _sqlite3.PrepareProtocol)]
def test_null_character(con):
if not hasattr(_sqlite3, '_ffi') and sys.version_info < (2, 7, 9):
pytest.skip("_sqlite3 too old")
with pytest.raises(ValueError) as excinfo:
con("\0select 1")
assert str(excinfo.value) == "the query contains a null character"
with pytest.raises(ValueError) as excinfo:
con("select 1\0")
assert str(excinfo.value) == "the query contains a null character"
cur = con.cursor()
with pytest.raises(ValueError) as excinfo:
cur.execute("\0select 2")
assert str(excinfo.value) == "the query contains a null character"
with pytest.raises(ValueError) as excinfo:
cur.execute("select 2\0")
assert str(excinfo.value) == "the query contains a null character"
def test_close_in_del_ordering():
import gc
class SQLiteBackend(object):
success = False
def __init__(self):
self.connection = _sqlite3.connect(":memory:")
def close(self):
self.connection.close()
def __del__(self):
self.close()
SQLiteBackend.success = True
def create_db_if_needed(self):
conn = self.connection
cursor = conn.cursor()
cursor.execute("""
create table if not exists nameoftable(value text)
""")
cursor.close()
conn.commit()
SQLiteBackend().create_db_if_needed()
gc.collect()
gc.collect()
assert SQLiteBackend.success
|