File: test_operations.py

package info (click to toggle)
python-django 3%3A6.0~alpha1-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 62,204 kB
  • sloc: python: 370,694; javascript: 19,376; xml: 211; makefile: 187; sh: 28
file content (141 lines) | stat: -rw-r--r-- 5,157 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import sqlite3
import unittest

from django.core.management.color import no_style
from django.db import connection, models
from django.test import TestCase

from ..models import Person, Tag


@unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests.")
class SQLiteOperationsTests(TestCase):
    def test_sql_flush(self):
        self.assertEqual(
            connection.ops.sql_flush(
                no_style(),
                [Person._meta.db_table, Tag._meta.db_table],
            ),
            [
                'DELETE FROM "backends_person";',
                'DELETE FROM "backends_tag";',
            ],
        )

    def test_sql_flush_allow_cascade(self):
        statements = connection.ops.sql_flush(
            no_style(),
            [Person._meta.db_table, Tag._meta.db_table],
            allow_cascade=True,
        )
        self.assertEqual(
            # The tables are processed in an unordered set.
            sorted(statements),
            [
                'DELETE FROM "backends_person";',
                'DELETE FROM "backends_tag";',
                'DELETE FROM "backends_verylongmodelnamezzzzzzzzzzzzzzzzzzzzzz'
                "zzzzzzzzzzzzzzzzzzzz_m2m_also_quite_long_zzzzzzzzzzzzzzzzzzzz"
                'zzzzzzzzzzzzzzzzzzzzzzz";',
            ],
        )

    def test_sql_flush_sequences(self):
        self.assertEqual(
            connection.ops.sql_flush(
                no_style(),
                [Person._meta.db_table, Tag._meta.db_table],
                reset_sequences=True,
            ),
            [
                'DELETE FROM "backends_person";',
                'DELETE FROM "backends_tag";',
                'UPDATE "sqlite_sequence" SET "seq" = 0 WHERE "name" IN '
                "('backends_person', 'backends_tag');",
            ],
        )

    def test_sql_flush_sequences_allow_cascade(self):
        statements = connection.ops.sql_flush(
            no_style(),
            [Person._meta.db_table, Tag._meta.db_table],
            reset_sequences=True,
            allow_cascade=True,
        )
        self.assertEqual(
            # The tables are processed in an unordered set.
            sorted(statements[:-1]),
            [
                'DELETE FROM "backends_person";',
                'DELETE FROM "backends_tag";',
                'DELETE FROM "backends_verylongmodelnamezzzzzzzzzzzzzzzzzzzzzz'
                "zzzzzzzzzzzzzzzzzzzz_m2m_also_quite_long_zzzzzzzzzzzzzzzzzzzz"
                'zzzzzzzzzzzzzzzzzzzzzzz";',
            ],
        )
        self.assertIs(
            statements[-1].startswith(
                'UPDATE "sqlite_sequence" SET "seq" = 0 WHERE "name" IN ('
            ),
            True,
        )
        self.assertIn("'backends_person'", statements[-1])
        self.assertIn("'backends_tag'", statements[-1])
        self.assertIn(
            "'backends_verylongmodelnamezzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"
            "zzzz_m2m_also_quite_long_zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"
            "zzz'",
            statements[-1],
        )

    def test_bulk_batch_size(self):
        self.assertEqual(connection.ops.bulk_batch_size([], [Person()]), 1)
        first_name_field = Person._meta.get_field("first_name")
        last_name_field = Person._meta.get_field("last_name")
        self.assertEqual(
            connection.ops.bulk_batch_size([first_name_field], [Person()]),
            connection.features.max_query_params,
        )
        self.assertEqual(
            connection.ops.bulk_batch_size(
                [first_name_field, last_name_field], [Person()]
            ),
            connection.features.max_query_params // 2,
        )
        composite_pk = models.CompositePrimaryKey("first_name", "last_name")
        composite_pk.fields = [first_name_field, last_name_field]
        self.assertEqual(
            connection.ops.bulk_batch_size(
                [composite_pk, first_name_field], [Person()]
            ),
            connection.features.max_query_params // 3,
        )

    def test_bulk_batch_size_respects_variable_limit(self):
        first_name_field = Person._meta.get_field("first_name")
        last_name_field = Person._meta.get_field("last_name")
        limit_name = sqlite3.SQLITE_LIMIT_VARIABLE_NUMBER
        current_limit = connection.features.max_query_params
        self.assertEqual(
            connection.ops.bulk_batch_size(
                [first_name_field, last_name_field], [Person()]
            ),
            current_limit // 2,
        )
        new_limit = min(42, current_limit)
        try:
            connection.connection.setlimit(limit_name, new_limit)
            self.assertEqual(
                connection.ops.bulk_batch_size(
                    [first_name_field, last_name_field], [Person()]
                ),
                new_limit // 2,
            )
        finally:
            connection.connection.setlimit(limit_name, current_limit)
        self.assertEqual(
            connection.ops.bulk_batch_size(
                [first_name_field, last_name_field], [Person()]
            ),
            current_limit // 2,
        )