1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
|
import os
import re
import tempfile
import threading
import unittest
from pathlib import Path
from sqlite3 import dbapi2
from unittest import mock
from django.core.exceptions import ImproperlyConfigured
from django.db import NotSupportedError, connection, transaction
from django.db.models import Aggregate, Avg, CharField, StdDev, Sum, Variance
from django.db.utils import ConnectionHandler
from django.test import (
TestCase, TransactionTestCase, override_settings, skipIfDBFeature,
)
from django.test.utils import isolate_apps
from ..models import Author, Item, Object, Square
try:
from django.db.backends.sqlite3.base import check_sqlite_version
except ImproperlyConfigured:
# Ignore "SQLite is too old" when running tests on another database.
pass
@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
class Tests(TestCase):
longMessage = True
def test_check_sqlite_version(self):
msg = 'SQLite 3.9.0 or later is required (found 3.8.11.1).'
with mock.patch.object(dbapi2, 'sqlite_version_info', (3, 8, 11, 1)), \
mock.patch.object(dbapi2, 'sqlite_version', '3.8.11.1'), \
self.assertRaisesMessage(ImproperlyConfigured, msg):
check_sqlite_version()
def test_aggregation(self):
"""Raise NotSupportedError when aggregating on date/time fields."""
for aggregate in (Sum, Avg, Variance, StdDev):
with self.assertRaises(NotSupportedError):
Item.objects.all().aggregate(aggregate('time'))
with self.assertRaises(NotSupportedError):
Item.objects.all().aggregate(aggregate('date'))
with self.assertRaises(NotSupportedError):
Item.objects.all().aggregate(aggregate('last_modified'))
with self.assertRaises(NotSupportedError):
Item.objects.all().aggregate(
**{'complex': aggregate('last_modified') + aggregate('last_modified')}
)
def test_distinct_aggregation(self):
class DistinctAggregate(Aggregate):
allow_distinct = True
aggregate = DistinctAggregate('first', 'second', distinct=True)
msg = (
"SQLite doesn't support DISTINCT on aggregate functions accepting "
"multiple arguments."
)
with self.assertRaisesMessage(NotSupportedError, msg):
connection.ops.check_expression_support(aggregate)
def test_distinct_aggregation_multiple_args_no_distinct(self):
# Aggregate functions accept multiple arguments when DISTINCT isn't
# used, e.g. GROUP_CONCAT().
class DistinctAggregate(Aggregate):
allow_distinct = True
aggregate = DistinctAggregate('first', 'second', distinct=False)
connection.ops.check_expression_support(aggregate)
def test_memory_db_test_name(self):
"""A named in-memory db should be allowed where supported."""
from django.db.backends.sqlite3.base import DatabaseWrapper
settings_dict = {
'TEST': {
'NAME': 'file:memorydb_test?mode=memory&cache=shared',
}
}
creation = DatabaseWrapper(settings_dict).creation
self.assertEqual(creation._get_test_db_name(), creation.connection.settings_dict['TEST']['NAME'])
def test_regexp_function(self):
tests = (
('test', r'[0-9]+', False),
('test', r'[a-z]+', True),
('test', None, None),
(None, r'[a-z]+', None),
(None, None, None),
)
for string, pattern, expected in tests:
with self.subTest((string, pattern)):
with connection.cursor() as cursor:
cursor.execute('SELECT %s REGEXP %s', [string, pattern])
value = cursor.fetchone()[0]
value = bool(value) if value in {0, 1} else value
self.assertIs(value, expected)
def test_pathlib_name(self):
with tempfile.TemporaryDirectory() as tmp:
settings_dict = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': Path(tmp) / 'test.db',
},
}
connections = ConnectionHandler(settings_dict)
connections['default'].ensure_connection()
connections['default'].close()
self.assertTrue(os.path.isfile(os.path.join(tmp, 'test.db')))
@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
@isolate_apps('backends')
class SchemaTests(TransactionTestCase):
available_apps = ['backends']
def test_autoincrement(self):
"""
auto_increment fields are created with the AUTOINCREMENT keyword
in order to be monotonically increasing (#10164).
"""
with connection.schema_editor(collect_sql=True) as editor:
editor.create_model(Square)
statements = editor.collected_sql
match = re.search('"id" ([^,]+),', statements[0])
self.assertIsNotNone(match)
self.assertEqual(
'integer NOT NULL PRIMARY KEY AUTOINCREMENT',
match[1],
'Wrong SQL used to create an auto-increment column on SQLite'
)
def test_disable_constraint_checking_failure_disallowed(self):
"""
SQLite schema editor is not usable within an outer transaction if
foreign key constraint checks are not disabled beforehand.
"""
msg = (
'SQLite schema editor cannot be used while foreign key '
'constraint checks are enabled. Make sure to disable them '
'before entering a transaction.atomic() context because '
'SQLite does not support disabling them in the middle of '
'a multi-statement transaction.'
)
with self.assertRaisesMessage(NotSupportedError, msg):
with transaction.atomic(), connection.schema_editor(atomic=True):
pass
def test_constraint_checks_disabled_atomic_allowed(self):
"""
SQLite schema editor is usable within an outer transaction as long as
foreign key constraints checks are disabled beforehand.
"""
def constraint_checks_enabled():
with connection.cursor() as cursor:
return bool(cursor.execute('PRAGMA foreign_keys').fetchone()[0])
with connection.constraint_checks_disabled(), transaction.atomic():
with connection.schema_editor(atomic=True):
self.assertFalse(constraint_checks_enabled())
self.assertFalse(constraint_checks_enabled())
self.assertTrue(constraint_checks_enabled())
@skipIfDBFeature('supports_atomic_references_rename')
def test_field_rename_inside_atomic_block(self):
"""
NotImplementedError is raised when a model field rename is attempted
inside an atomic block.
"""
new_field = CharField(max_length=255, unique=True)
new_field.set_attributes_from_name('renamed')
msg = (
"Renaming the 'backends_author'.'name' column while in a "
"transaction is not supported on SQLite < 3.26 because it would "
"break referential integrity. Try adding `atomic = False` to the "
"Migration class."
)
with self.assertRaisesMessage(NotSupportedError, msg):
with connection.schema_editor(atomic=True) as editor:
editor.alter_field(Author, Author._meta.get_field('name'), new_field)
@skipIfDBFeature('supports_atomic_references_rename')
def test_table_rename_inside_atomic_block(self):
"""
NotImplementedError is raised when a table rename is attempted inside
an atomic block.
"""
msg = (
"Renaming the 'backends_author' table while in a transaction is "
"not supported on SQLite < 3.26 because it would break referential "
"integrity. Try adding `atomic = False` to the Migration class."
)
with self.assertRaisesMessage(NotSupportedError, msg):
with connection.schema_editor(atomic=True) as editor:
editor.alter_db_table(Author, "backends_author", "renamed_table")
@unittest.skipUnless(connection.vendor == 'sqlite', 'Test only for SQLite')
@override_settings(DEBUG=True)
class LastExecutedQueryTest(TestCase):
def test_no_interpolation(self):
# This shouldn't raise an exception (#17158)
query = "SELECT strftime('%Y', 'now');"
with connection.cursor() as cursor:
cursor.execute(query)
self.assertEqual(connection.queries[-1]['sql'], query)
def test_parameter_quoting(self):
# The implementation of last_executed_queries isn't optimal. It's
# worth testing that parameters are quoted (#14091).
query = "SELECT %s"
params = ["\"'\\"]
with connection.cursor() as cursor:
cursor.execute(query, params)
# Note that the single quote is repeated
substituted = "SELECT '\"''\\'"
self.assertEqual(connection.queries[-1]['sql'], substituted)
def test_large_number_of_parameters(self):
# If SQLITE_MAX_VARIABLE_NUMBER (default = 999) has been changed to be
# greater than SQLITE_MAX_COLUMN (default = 2000), last_executed_query
# can hit the SQLITE_MAX_COLUMN limit (#26063).
with connection.cursor() as cursor:
sql = "SELECT MAX(%s)" % ", ".join(["%s"] * 2001)
params = list(range(2001))
# This should not raise an exception.
cursor.db.ops.last_executed_query(cursor.cursor, sql, params)
@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
class EscapingChecks(TestCase):
"""
All tests in this test case are also run with settings.DEBUG=True in
EscapingChecksDebug test case, to also test CursorDebugWrapper.
"""
def test_parameter_escaping(self):
# '%s' escaping support for sqlite3 (#13648).
with connection.cursor() as cursor:
cursor.execute("select strftime('%s', date('now'))")
response = cursor.fetchall()[0][0]
# response should be an non-zero integer
self.assertTrue(int(response))
@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
@override_settings(DEBUG=True)
class EscapingChecksDebug(EscapingChecks):
pass
@unittest.skipUnless(connection.vendor == 'sqlite', 'SQLite tests')
class ThreadSharing(TransactionTestCase):
available_apps = ['backends']
def test_database_sharing_in_threads(self):
def create_object():
Object.objects.create()
create_object()
thread = threading.Thread(target=create_object)
thread.start()
thread.join()
self.assertEqual(Object.objects.count(), 2)
|