1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
|
from functools import reduce
from itertools import chain
from typing import Any, Dict, List, Optional, no_type_check
from django.core.serializers.json import DjangoJSONEncoder
from django.db.models import Manager, Model
from django.db.models.fields.files import ImageFieldFile
from django.db.models.fields.reverse_related import (ForeignObjectRel,
OneToOneRel)
from django.utils.encoding import force_str
from django.utils.functional import Promise
from pydantic.v1 import BaseModel, ConfigError, create_model
from pydantic.v1.main import ModelMetaclass
from pydantic.v1.utils import GetterDict
from .fields import ModelSchemaField
_is_base_model_class_defined = False
class ModelSchemaJSONEncoder(DjangoJSONEncoder):
@no_type_check
def default(self, obj): # pragma: nocover
if isinstance(obj, Promise):
return force_str(obj)
return super().default(obj)
def get_field_name(field) -> str:
if issubclass(field.__class__, ForeignObjectRel) and not issubclass(field.__class__, OneToOneRel):
return getattr(field, "related_name", None) or f"{field.name}_set"
else:
return getattr(field, "name", field)
class ModelSchemaMetaclass(ModelMetaclass):
@no_type_check
def __new__(
mcs,
name: str,
bases: tuple,
namespace: dict,
):
cls = super().__new__(mcs, name, bases, namespace)
for base in reversed(bases):
if (
_is_base_model_class_defined
and issubclass(base, ModelSchema)
and base == ModelSchema
):
try:
config = namespace["Config"]
except KeyError as exc:
raise ConfigError(
f"{exc} (Is `Config` class defined?)"
)
include = getattr(config, "include", None)
exclude = getattr(config, "exclude", None)
if include and exclude:
raise ConfigError(
"Only one of 'include' or 'exclude' should be set in "
"configuration."
)
annotations = namespace.get("__annotations__", {})
try:
fields = config.model._meta.get_fields()
except AttributeError as exc:
raise ConfigError(
f"{exc} (Is `Config.model` a valid Django model class?)"
)
if include == '__annotations__':
include = list(annotations.keys())
cls.__config__.include = include
elif include is None and exclude is None:
include = list(annotations.keys()) + [get_field_name(f) for f in fields]
cls.__config__.include = include
field_values = {}
_seen = set()
for field in chain(fields, annotations.copy()):
field_name = get_field_name(field)
if (
field_name in _seen
or (include and field_name not in include)
or (exclude and field_name in exclude)
):
continue
_seen.add(field_name)
python_type = None
pydantic_field = None
if field_name in annotations and field_name in namespace:
python_type = annotations.pop(field_name)
pydantic_field = namespace[field_name]
if (
hasattr(pydantic_field, "default_factory")
and pydantic_field.default_factory
):
pydantic_field = pydantic_field.default_factory()
elif field_name in annotations:
python_type = annotations.pop(field_name)
pydantic_field = (
None if Optional[python_type] == python_type else Ellipsis
)
else:
python_type, pydantic_field = ModelSchemaField(field, name)
field_values[field_name] = (python_type, pydantic_field)
cls.__doc__ = namespace.get("__doc__", config.model.__doc__)
cls.__fields__ = {}
cls.__alias_map__ = {getattr(model_field[1], 'alias', None) or field_name: field_name
for field_name, model_field in field_values.items()}
model_schema = create_model(
name, __base__=cls, __module__=cls.__module__, **field_values
)
return model_schema
return cls
class ProxyGetterNestedObj(GetterDict):
def __init__(self, obj: Any, schema_class):
self._obj = obj
self.schema_class = schema_class
def get(self, key: Any, default: Any = None) -> Any:
alias = self.schema_class.__alias_map__[key]
outer_type_ = self.schema_class.__fields__[alias].outer_type_
if "__" in key:
# Allow double underscores aliases: `first_name: str = Field(alias="user__first_name")`
keys_map = key.split("__")
attr = reduce(lambda a, b: getattr(a, b, default), keys_map, self._obj)
else:
attr = getattr(self._obj, key, None)
is_manager = issubclass(attr.__class__, Manager)
if is_manager and outer_type_ == List[Dict[str, int]]:
attr = list(attr.all().values("id"))
elif is_manager:
attr = list(attr.all())
elif outer_type_ == int and issubclass(type(attr), Model):
attr = attr.id
elif issubclass(attr.__class__, ImageFieldFile) and issubclass(outer_type_, str):
attr = attr.name
return attr
class ModelSchema(BaseModel, metaclass=ModelSchemaMetaclass):
class Config:
orm_mode = True
@classmethod
def schema_json(
cls,
*,
by_alias: bool = True,
encoder_cls: Any = ModelSchemaJSONEncoder,
**dumps_kwargs: Any,
) -> str:
return cls.__config__.json_dumps(
cls.schema(by_alias=by_alias), cls=encoder_cls, **dumps_kwargs
)
@classmethod
@no_type_check
def get_field_names(cls) -> List[str]:
if hasattr(cls.__config__, "exclude"):
django_model_fields = cls.__config__.model._meta.get_fields()
all_fields = [f.name for f in django_model_fields]
return [
name for name in all_fields if name not in cls.__config__.exclude
]
return cls.__config__.include
@classmethod
def from_orm(cls, *args, **kwargs):
return cls.from_django(*args, **kwargs)
@classmethod
def from_django(cls, objs, many=False, context={}):
cls.context = context
if many:
result_objs = []
for obj in objs:
cls.instance = obj
result_objs.append(super().from_orm(ProxyGetterNestedObj(obj, cls)))
return result_objs
cls.instance = objs
return super().from_orm(ProxyGetterNestedObj(objs, cls))
_is_base_model_class_defined = True
|