1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
from typing import (
Any,
Dict,
Generator,
Iterable,
List,
Optional,
Type,
TypeVar,
cast,
)
from django.core.exceptions import FieldDoesNotExist
from django.db import connection, models
from django.db.models import Field, Model
from django.db.models.expressions import Expression
from .fields import inspect_model_local_concrete_fields
TModel = TypeVar("TModel", bound=models.Model)
def _construct_model(
model: Type[TModel],
columns: Iterable[str],
values: Iterable[Any],
*,
apply_converters: bool = True
) -> TModel:
fields_by_name_and_column = {}
for concrete_field in inspect_model_local_concrete_fields(model):
fields_by_name_and_column[concrete_field.attname] = concrete_field
if concrete_field.db_column:
fields_by_name_and_column[concrete_field.db_column] = concrete_field
indexable_columns = list(columns)
row = {}
for index, value in enumerate(values):
column = indexable_columns[index]
try:
field: Optional[Field] = cast(Field, model._meta.get_field(column))
except FieldDoesNotExist:
field = fields_by_name_and_column.get(column)
if not field:
continue
field_column_expression = field.get_col(model._meta.db_table)
if apply_converters:
converters = cast(Expression, field).get_db_converters(
connection
) + connection.ops.get_db_converters(field_column_expression)
converted_value = value
for converter in converters:
converted_value = converter(
converted_value,
field_column_expression,
connection,
)
else:
converted_value = value
row[field.attname] = converted_value
instance = model(**row)
instance._state.adding = False
instance._state.db = connection.alias
return instance
def models_from_cursor(
model: Type[TModel], cursor, *, related_fields: List[str] = []
) -> Generator[TModel, None, None]:
"""Fetches all rows from a cursor and converts the values into model
instances.
This is roughly what Django does internally when you do queries. This
goes further than `Model.from_db` as it also applies converters to make
sure that values are converted into their Python equivalent.
Use this when you've outgrown the ORM and you are writing performant
queries yourself and you need to map the results back into ORM objects.
Arguments:
model:
Model to construct.
cursor:
Cursor to read the rows from.
related_fields:
List of ForeignKey/OneToOneField names that were joined
into the raw query. Use this to achieve the same thing
that Django's `.select_related()` does.
Field names should be specified in the order that they
are SELECT'd in.
"""
columns = [col[0] for col in cursor.description]
field_offset = len(inspect_model_local_concrete_fields(model))
rows = cursor.fetchmany()
while rows:
for values in rows:
instance = _construct_model(
model, columns[:field_offset], values[:field_offset]
)
for index, related_field_name in enumerate(related_fields):
related_model = model._meta.get_field(
related_field_name
).related_model
if not related_model:
continue
related_field_count = len(
inspect_model_local_concrete_fields(related_model)
)
# autopep8: off
related_columns = columns[
field_offset : field_offset + related_field_count # noqa
]
related_values = values[
field_offset : field_offset + related_field_count # noqa
]
# autopep8: one
if (
not related_columns
or not related_values
or all([value is None for value in related_values])
):
continue
related_instance = _construct_model(
cast(Type[Model], related_model),
related_columns,
related_values,
)
instance._state.fields_cache[related_field_name] = related_instance # type: ignore
field_offset += len(
inspect_model_local_concrete_fields(related_model)
)
yield instance
rows = cursor.fetchmany()
def model_from_cursor(
model: Type[TModel], cursor, *, related_fields: List[str] = []
) -> Optional[TModel]:
return next(
models_from_cursor(model, cursor, related_fields=related_fields), None
)
def model_from_dict(
model: Type[TModel], row: Dict[str, Any], *, apply_converters: bool = True
) -> TModel:
return _construct_model(
model, row.keys(), row.values(), apply_converters=apply_converters
)
|