1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
|
import sqlite3
import unittest
from django.core.management.color import no_style
from django.db import connection, models
from django.test import TestCase
from ..models import Person, Tag
@unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests.")
class SQLiteOperationsTests(TestCase):
def test_sql_flush(self):
self.assertEqual(
connection.ops.sql_flush(
no_style(),
[Person._meta.db_table, Tag._meta.db_table],
),
[
'DELETE FROM "backends_person";',
'DELETE FROM "backends_tag";',
],
)
def test_sql_flush_allow_cascade(self):
statements = connection.ops.sql_flush(
no_style(),
[Person._meta.db_table, Tag._meta.db_table],
allow_cascade=True,
)
self.assertEqual(
# The tables are processed in an unordered set.
sorted(statements),
[
'DELETE FROM "backends_person";',
'DELETE FROM "backends_tag";',
'DELETE FROM "backends_verylongmodelnamezzzzzzzzzzzzzzzzzzzzzz'
"zzzzzzzzzzzzzzzzzzzz_m2m_also_quite_long_zzzzzzzzzzzzzzzzzzzz"
'zzzzzzzzzzzzzzzzzzzzzzz";',
],
)
def test_sql_flush_sequences(self):
self.assertEqual(
connection.ops.sql_flush(
no_style(),
[Person._meta.db_table, Tag._meta.db_table],
reset_sequences=True,
),
[
'DELETE FROM "backends_person";',
'DELETE FROM "backends_tag";',
'UPDATE "sqlite_sequence" SET "seq" = 0 WHERE "name" IN '
"('backends_person', 'backends_tag');",
],
)
def test_sql_flush_sequences_allow_cascade(self):
statements = connection.ops.sql_flush(
no_style(),
[Person._meta.db_table, Tag._meta.db_table],
reset_sequences=True,
allow_cascade=True,
)
self.assertEqual(
# The tables are processed in an unordered set.
sorted(statements[:-1]),
[
'DELETE FROM "backends_person";',
'DELETE FROM "backends_tag";',
'DELETE FROM "backends_verylongmodelnamezzzzzzzzzzzzzzzzzzzzzz'
"zzzzzzzzzzzzzzzzzzzz_m2m_also_quite_long_zzzzzzzzzzzzzzzzzzzz"
'zzzzzzzzzzzzzzzzzzzzzzz";',
],
)
self.assertIs(
statements[-1].startswith(
'UPDATE "sqlite_sequence" SET "seq" = 0 WHERE "name" IN ('
),
True,
)
self.assertIn("'backends_person'", statements[-1])
self.assertIn("'backends_tag'", statements[-1])
self.assertIn(
"'backends_verylongmodelnamezzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"
"zzzz_m2m_also_quite_long_zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"
"zzz'",
statements[-1],
)
def test_bulk_batch_size(self):
self.assertEqual(connection.ops.bulk_batch_size([], [Person()]), 1)
first_name_field = Person._meta.get_field("first_name")
last_name_field = Person._meta.get_field("last_name")
self.assertEqual(
connection.ops.bulk_batch_size([first_name_field], [Person()]),
connection.features.max_query_params,
)
self.assertEqual(
connection.ops.bulk_batch_size(
[first_name_field, last_name_field], [Person()]
),
connection.features.max_query_params // 2,
)
composite_pk = models.CompositePrimaryKey("first_name", "last_name")
composite_pk.fields = [first_name_field, last_name_field]
self.assertEqual(
connection.ops.bulk_batch_size(
[composite_pk, first_name_field], [Person()]
),
connection.features.max_query_params // 3,
)
def test_bulk_batch_size_respects_variable_limit(self):
first_name_field = Person._meta.get_field("first_name")
last_name_field = Person._meta.get_field("last_name")
limit_name = sqlite3.SQLITE_LIMIT_VARIABLE_NUMBER
current_limit = connection.features.max_query_params
self.assertEqual(
connection.ops.bulk_batch_size(
[first_name_field, last_name_field], [Person()]
),
current_limit // 2,
)
new_limit = min(42, current_limit)
try:
connection.connection.setlimit(limit_name, new_limit)
self.assertEqual(
connection.ops.bulk_batch_size(
[first_name_field, last_name_field], [Person()]
),
new_limit // 2,
)
finally:
connection.connection.setlimit(limit_name, current_limit)
self.assertEqual(
connection.ops.bulk_batch_size(
[first_name_field, last_name_field], [Person()]
),
current_limit // 2,
)
|