1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
|
from datetime import date
from decimal import Decimal
from django.core.exceptions import FieldDoesNotExist
from django.db.models.query import RawQuerySet
from django.test import TestCase, skipUnlessDBFeature
from .models import (
Author, Book, BookFkAsPk, Coffee, FriendlyAuthor, MixedCaseIDColumn,
Reviewer,
)
class RawQueryTests(TestCase):
@classmethod
def setUpTestData(cls):
cls.a1 = Author.objects.create(first_name='Joe', last_name='Smith', dob=date(1950, 9, 20))
cls.a2 = Author.objects.create(first_name='Jill', last_name='Doe', dob=date(1920, 4, 2))
cls.a3 = Author.objects.create(first_name='Bob', last_name='Smith', dob=date(1986, 1, 25))
cls.a4 = Author.objects.create(first_name='Bill', last_name='Jones', dob=date(1932, 5, 10))
cls.b1 = Book.objects.create(
title='The awesome book', author=cls.a1, paperback=False,
opening_line='It was a bright cold day in April and the clocks were striking thirteen.',
)
cls.b2 = Book.objects.create(
title='The horrible book', author=cls.a1, paperback=True,
opening_line=(
'On an evening in the latter part of May a middle-aged man '
'was walking homeward from Shaston to the village of Marlott, '
'in the adjoining Vale of Blakemore, or Blackmoor.'
),
)
cls.b3 = Book.objects.create(
title='Another awesome book', author=cls.a1, paperback=False,
opening_line='A squat grey building of only thirty-four stories.',
)
cls.b4 = Book.objects.create(
title='Some other book', author=cls.a3, paperback=True,
opening_line='It was the day my grandmother exploded.',
)
cls.c1 = Coffee.objects.create(brand='dunkin doughnuts')
cls.c2 = Coffee.objects.create(brand='starbucks')
cls.r1 = Reviewer.objects.create()
cls.r2 = Reviewer.objects.create()
cls.r1.reviewed.add(cls.b2, cls.b3, cls.b4)
def assertSuccessfulRawQuery(self, model, query, expected_results,
expected_annotations=(), params=[], translations=None):
"""
Execute the passed query against the passed model and check the output
"""
results = list(model.objects.raw(query, params=params, translations=translations))
self.assertProcessed(model, results, expected_results, expected_annotations)
self.assertAnnotations(results, expected_annotations)
def assertProcessed(self, model, results, orig, expected_annotations=()):
"""
Compare the results of a raw query against expected results
"""
self.assertEqual(len(results), len(orig))
for index, item in enumerate(results):
orig_item = orig[index]
for annotation in expected_annotations:
setattr(orig_item, *annotation)
for field in model._meta.fields:
# All values on the model are equal
self.assertEqual(
getattr(item, field.attname),
getattr(orig_item, field.attname)
)
# This includes checking that they are the same type
self.assertEqual(
type(getattr(item, field.attname)),
type(getattr(orig_item, field.attname))
)
def assertNoAnnotations(self, results):
"""
The results of a raw query contain no annotations
"""
self.assertAnnotations(results, ())
def assertAnnotations(self, results, expected_annotations):
"""
The passed raw query results contain the expected annotations
"""
if expected_annotations:
for index, result in enumerate(results):
annotation, value = expected_annotations[index]
self.assertTrue(hasattr(result, annotation))
self.assertEqual(getattr(result, annotation), value)
def test_rawqueryset_repr(self):
queryset = RawQuerySet(raw_query='SELECT * FROM raw_query_author')
self.assertEqual(repr(queryset), '<RawQuerySet: SELECT * FROM raw_query_author>')
self.assertEqual(repr(queryset.query), '<RawQuery: SELECT * FROM raw_query_author>')
def test_simple_raw_query(self):
"""
Basic test of raw query with a simple database query
"""
query = "SELECT * FROM raw_query_author"
authors = Author.objects.all()
self.assertSuccessfulRawQuery(Author, query, authors)
def test_raw_query_lazy(self):
"""
Raw queries are lazy: they aren't actually executed until they're
iterated over.
"""
q = Author.objects.raw('SELECT * FROM raw_query_author')
self.assertIsNone(q.query.cursor)
list(q)
self.assertIsNotNone(q.query.cursor)
def test_FK_raw_query(self):
"""
Test of a simple raw query against a model containing a foreign key
"""
query = "SELECT * FROM raw_query_book"
books = Book.objects.all()
self.assertSuccessfulRawQuery(Book, query, books)
def test_db_column_handler(self):
"""
Test of a simple raw query against a model containing a field with
db_column defined.
"""
query = "SELECT * FROM raw_query_coffee"
coffees = Coffee.objects.all()
self.assertSuccessfulRawQuery(Coffee, query, coffees)
def test_pk_with_mixed_case_db_column(self):
"""
A raw query with a model that has a pk db_column with mixed case.
"""
query = "SELECT * FROM raw_query_mixedcaseidcolumn"
queryset = MixedCaseIDColumn.objects.all()
self.assertSuccessfulRawQuery(MixedCaseIDColumn, query, queryset)
def test_order_handler(self):
"""
Test of raw raw query's tolerance for columns being returned in any
order
"""
selects = (
('dob, last_name, first_name, id'),
('last_name, dob, first_name, id'),
('first_name, last_name, dob, id'),
)
for select in selects:
query = "SELECT %s FROM raw_query_author" % select
authors = Author.objects.all()
self.assertSuccessfulRawQuery(Author, query, authors)
def test_translations(self):
"""
Test of raw query's optional ability to translate unexpected result
column names to specific model fields
"""
query = "SELECT first_name AS first, last_name AS last, dob, id FROM raw_query_author"
translations = {'first': 'first_name', 'last': 'last_name'}
authors = Author.objects.all()
self.assertSuccessfulRawQuery(Author, query, authors, translations=translations)
def test_params(self):
"""
Test passing optional query parameters
"""
query = "SELECT * FROM raw_query_author WHERE first_name = %s"
author = Author.objects.all()[2]
params = [author.first_name]
qset = Author.objects.raw(query, params=params)
results = list(qset)
self.assertProcessed(Author, results, [author])
self.assertNoAnnotations(results)
self.assertEqual(len(results), 1)
self.assertIsInstance(repr(qset), str)
def test_params_none(self):
query = "SELECT * FROM raw_query_author WHERE first_name like 'J%'"
qset = Author.objects.raw(query, params=None)
self.assertEqual(len(qset), 2)
def test_escaped_percent(self):
query = "SELECT * FROM raw_query_author WHERE first_name like 'J%%'"
qset = Author.objects.raw(query)
self.assertEqual(len(qset), 2)
@skipUnlessDBFeature('supports_paramstyle_pyformat')
def test_pyformat_params(self):
"""
Test passing optional query parameters
"""
query = "SELECT * FROM raw_query_author WHERE first_name = %(first)s"
author = Author.objects.all()[2]
params = {'first': author.first_name}
qset = Author.objects.raw(query, params=params)
results = list(qset)
self.assertProcessed(Author, results, [author])
self.assertNoAnnotations(results)
self.assertEqual(len(results), 1)
self.assertIsInstance(repr(qset), str)
def test_query_representation(self):
"""
Test representation of raw query with parameters
"""
query = "SELECT * FROM raw_query_author WHERE last_name = %(last)s"
qset = Author.objects.raw(query, {'last': 'foo'})
self.assertEqual(repr(qset), "<RawQuerySet: SELECT * FROM raw_query_author WHERE last_name = foo>")
self.assertEqual(repr(qset.query), "<RawQuery: SELECT * FROM raw_query_author WHERE last_name = foo>")
query = "SELECT * FROM raw_query_author WHERE last_name = %s"
qset = Author.objects.raw(query, {'foo'})
self.assertEqual(repr(qset), "<RawQuerySet: SELECT * FROM raw_query_author WHERE last_name = foo>")
self.assertEqual(repr(qset.query), "<RawQuery: SELECT * FROM raw_query_author WHERE last_name = foo>")
def test_many_to_many(self):
"""
Test of a simple raw query against a model containing a m2m field
"""
query = "SELECT * FROM raw_query_reviewer"
reviewers = Reviewer.objects.all()
self.assertSuccessfulRawQuery(Reviewer, query, reviewers)
def test_extra_conversions(self):
"""
Test to insure that extra translations are ignored.
"""
query = "SELECT * FROM raw_query_author"
translations = {'something': 'else'}
authors = Author.objects.all()
self.assertSuccessfulRawQuery(Author, query, authors, translations=translations)
def test_missing_fields(self):
query = "SELECT id, first_name, dob FROM raw_query_author"
for author in Author.objects.raw(query):
self.assertIsNotNone(author.first_name)
# last_name isn't given, but it will be retrieved on demand
self.assertIsNotNone(author.last_name)
def test_missing_fields_without_PK(self):
query = "SELECT first_name, dob FROM raw_query_author"
msg = 'Raw query must include the primary key'
with self.assertRaisesMessage(FieldDoesNotExist, msg):
list(Author.objects.raw(query))
def test_annotations(self):
query = (
"SELECT a.*, count(b.id) as book_count "
"FROM raw_query_author a "
"LEFT JOIN raw_query_book b ON a.id = b.author_id "
"GROUP BY a.id, a.first_name, a.last_name, a.dob ORDER BY a.id"
)
expected_annotations = (
('book_count', 3),
('book_count', 0),
('book_count', 1),
('book_count', 0),
)
authors = Author.objects.all()
self.assertSuccessfulRawQuery(Author, query, authors, expected_annotations)
def test_white_space_query(self):
query = " SELECT * FROM raw_query_author"
authors = Author.objects.all()
self.assertSuccessfulRawQuery(Author, query, authors)
def test_multiple_iterations(self):
query = "SELECT * FROM raw_query_author"
normal_authors = Author.objects.all()
raw_authors = Author.objects.raw(query)
# First Iteration
first_iterations = 0
for index, raw_author in enumerate(raw_authors):
self.assertEqual(normal_authors[index], raw_author)
first_iterations += 1
# Second Iteration
second_iterations = 0
for index, raw_author in enumerate(raw_authors):
self.assertEqual(normal_authors[index], raw_author)
second_iterations += 1
self.assertEqual(first_iterations, second_iterations)
def test_get_item(self):
# Indexing on RawQuerySets
query = "SELECT * FROM raw_query_author ORDER BY id ASC"
third_author = Author.objects.raw(query)[2]
self.assertEqual(third_author.first_name, 'Bob')
first_two = Author.objects.raw(query)[0:2]
self.assertEqual(len(first_two), 2)
with self.assertRaises(TypeError):
Author.objects.raw(query)['test']
def test_inheritance(self):
f = FriendlyAuthor.objects.create(first_name="Wesley", last_name="Chun", dob=date(1962, 10, 28))
query = "SELECT * FROM raw_query_friendlyauthor"
self.assertEqual(
[o.pk for o in FriendlyAuthor.objects.raw(query)], [f.pk]
)
def test_query_count(self):
self.assertNumQueries(1, list, Author.objects.raw("SELECT * FROM raw_query_author"))
def test_subquery_in_raw_sql(self):
list(Book.objects.raw('SELECT id FROM (SELECT * FROM raw_query_book WHERE paperback IS NOT NULL) sq'))
def test_db_column_name_is_used_in_raw_query(self):
"""
Regression test that ensures the `column` attribute on the field is
used to generate the list of fields included in the query, as opposed
to the `attname`. This is important when the primary key is a
ForeignKey field because `attname` and `column` are not necessarily the
same.
"""
b = BookFkAsPk.objects.create(book=self.b1)
self.assertEqual(list(BookFkAsPk.objects.raw('SELECT not_the_default FROM raw_query_bookfkaspk')), [b])
def test_decimal_parameter(self):
c = Coffee.objects.create(brand='starbucks', price=20.5)
qs = Coffee.objects.raw("SELECT * FROM raw_query_coffee WHERE price >= %s", params=[Decimal(20)])
self.assertEqual(list(qs), [c])
def test_result_caching(self):
with self.assertNumQueries(1):
books = Book.objects.raw('SELECT * FROM raw_query_book')
list(books)
list(books)
def test_iterator(self):
with self.assertNumQueries(2):
books = Book.objects.raw('SELECT * FROM raw_query_book')
list(books.iterator())
list(books.iterator())
def test_bool(self):
self.assertIs(bool(Book.objects.raw('SELECT * FROM raw_query_book')), True)
self.assertIs(bool(Book.objects.raw('SELECT * FROM raw_query_book WHERE id = 0')), False)
def test_len(self):
self.assertEqual(len(Book.objects.raw('SELECT * FROM raw_query_book')), 4)
self.assertEqual(len(Book.objects.raw('SELECT * FROM raw_query_book WHERE id = 0')), 0)
|