1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
"""Tests for django.db.backends.utils"""
from decimal import Decimal, Rounded
from django.db import NotSupportedError, connection
from django.db.backends.utils import (
format_number,
split_identifier,
split_tzname_delta,
truncate_name,
)
from django.test import (
SimpleTestCase,
TransactionTestCase,
skipIfDBFeature,
skipUnlessDBFeature,
)
class TestUtils(SimpleTestCase):
def test_truncate_name(self):
self.assertEqual(truncate_name("some_table", 10), "some_table")
self.assertEqual(truncate_name("some_long_table", 10), "some_la38a")
self.assertEqual(truncate_name("some_long_table", 10, 3), "some_loa38")
self.assertEqual(truncate_name("some_long_table"), "some_long_table")
# "user"."table" syntax
self.assertEqual(
truncate_name('username"."some_table', 10), 'username"."some_table'
)
self.assertEqual(
truncate_name('username"."some_long_table', 10), 'username"."some_la38a'
)
self.assertEqual(
truncate_name('username"."some_long_table', 10, 3), 'username"."some_loa38'
)
def test_split_identifier(self):
self.assertEqual(split_identifier("some_table"), ("", "some_table"))
self.assertEqual(split_identifier('"some_table"'), ("", "some_table"))
self.assertEqual(
split_identifier('namespace"."some_table'), ("namespace", "some_table")
)
self.assertEqual(
split_identifier('"namespace"."some_table"'), ("namespace", "some_table")
)
def test_format_number(self):
def equal(value, max_d, places, result):
self.assertEqual(format_number(Decimal(value), max_d, places), result)
equal("0", 12, 3, "0.000")
equal("0", 12, 8, "0.00000000")
equal("1", 12, 9, "1.000000000")
equal("0.00000000", 12, 8, "0.00000000")
equal("0.000000004", 12, 8, "0.00000000")
equal("0.000000008", 12, 8, "0.00000001")
equal("0.000000000000000000999", 10, 8, "0.00000000")
equal("0.1234567890", 12, 10, "0.1234567890")
equal("0.1234567890", 12, 9, "0.123456789")
equal("0.1234567890", 12, 8, "0.12345679")
equal("0.1234567890", 12, 5, "0.12346")
equal("0.1234567890", 12, 3, "0.123")
equal("0.1234567890", 12, 1, "0.1")
equal("0.1234567890", 12, 0, "0")
equal("0.1234567890", None, 0, "0")
equal("1234567890.1234567890", None, 0, "1234567890")
equal("1234567890.1234567890", None, 2, "1234567890.12")
equal("0.1234", 5, None, "0.1234")
equal("123.12", 5, None, "123.12")
with self.assertRaises(Rounded):
equal("0.1234567890", 5, None, "0.12346")
with self.assertRaises(Rounded):
equal("1234567890.1234", 5, None, "1234600000")
def test_split_tzname_delta(self):
tests = [
("Asia/Ust+Nera", ("Asia/Ust+Nera", None, None)),
("Asia/Ust-Nera", ("Asia/Ust-Nera", None, None)),
("Asia/Ust+Nera-02:00", ("Asia/Ust+Nera", "-", "02:00")),
("Asia/Ust-Nera+05:00", ("Asia/Ust-Nera", "+", "05:00")),
("America/Coral_Harbour-01:00", ("America/Coral_Harbour", "-", "01:00")),
("America/Coral_Harbour+02:30", ("America/Coral_Harbour", "+", "02:30")),
("UTC+15:00", ("UTC", "+", "15:00")),
("UTC-04:43", ("UTC", "-", "04:43")),
("UTC", ("UTC", None, None)),
("UTC+1", ("UTC+1", None, None)),
]
for tzname, expected in tests:
with self.subTest(tzname=tzname):
self.assertEqual(split_tzname_delta(tzname), expected)
class CursorWrapperTests(TransactionTestCase):
available_apps = []
def _test_procedure(self, procedure_sql, params, param_types, kparams=None):
with connection.cursor() as cursor:
cursor.execute(procedure_sql)
# Use a new cursor because in MySQL a procedure can't be used in the
# same cursor in which it was created.
with connection.cursor() as cursor:
cursor.callproc("test_procedure", params, kparams)
with connection.schema_editor() as editor:
editor.remove_procedure("test_procedure", param_types)
@skipUnlessDBFeature("create_test_procedure_without_params_sql")
def test_callproc_without_params(self):
self._test_procedure(
connection.features.create_test_procedure_without_params_sql, [], []
)
@skipUnlessDBFeature("create_test_procedure_with_int_param_sql")
def test_callproc_with_int_params(self):
self._test_procedure(
connection.features.create_test_procedure_with_int_param_sql,
[1],
["INTEGER"],
)
@skipUnlessDBFeature(
"create_test_procedure_with_int_param_sql", "supports_callproc_kwargs"
)
def test_callproc_kparams(self):
self._test_procedure(
connection.features.create_test_procedure_with_int_param_sql,
[],
["INTEGER"],
{"P_I": 1},
)
@skipIfDBFeature("supports_callproc_kwargs")
def test_unsupported_callproc_kparams_raises_error(self):
msg = (
"Keyword parameters for callproc are not supported on this database "
"backend."
)
with self.assertRaisesMessage(NotSupportedError, msg):
with connection.cursor() as cursor:
cursor.callproc("test_procedure", [], {"P_I": 1})
|