File: parsing.py

package info (click to toggle)
python-beanie 2.0.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,484 kB
  • sloc: python: 14,427; makefile: 6; sh: 6
file content (148 lines) | stat: -rw-r--r-- 5,109 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
from typing import TYPE_CHECKING, Any, Dict, Type, Union

from pydantic import BaseModel

from beanie.exceptions import (
    ApplyChangesException,
    DocWasNotRegisteredInUnionClass,
    UnionHasNoRegisteredDocs,
)
from beanie.odm.interfaces.detector import ModelType
from beanie.odm.utils.pydantic import get_config_value, parse_model

if TYPE_CHECKING:
    from beanie.odm.documents import Document


def merge_models(left: BaseModel, right: BaseModel) -> None:
    """
    Merge two models
    :param left: left model
    :param right: right model
    :return: None
    """
    from beanie.odm.fields import Link

    for k, right_value in right.__iter__():
        left_value = getattr(left, k, None)
        if isinstance(right_value, BaseModel) and isinstance(
            left_value, BaseModel
        ):
            if get_config_value(left_value, "frozen"):
                left.__setattr__(k, right_value)
            else:
                merge_models(left_value, right_value)
            continue
        if isinstance(right_value, list):
            links_found = False
            for i in right_value:
                if isinstance(i, Link):
                    links_found = True
                    break
            if links_found:
                continue
            left.__setattr__(k, right_value)
        elif not isinstance(right_value, Link):
            left.__setattr__(k, right_value)


def apply_changes(
    changes: Dict[str, Any], target: Union[BaseModel, Dict[str, Any]]
):
    for key, value in changes.items():
        if "." in key:
            key_parts = key.split(".")
            current_target = target
            try:
                for part in key_parts[:-1]:
                    if isinstance(current_target, dict):
                        current_target = current_target[part]
                    elif isinstance(current_target, BaseModel):
                        current_target = getattr(current_target, part)
                    else:
                        raise ApplyChangesException(
                            f"Unexpected type of target: {type(target)}"
                        )
                final_key = key_parts[-1]
                if isinstance(current_target, dict):
                    current_target[final_key] = value
                elif isinstance(current_target, BaseModel):
                    setattr(current_target, final_key, value)
                else:
                    raise ApplyChangesException(
                        f"Unexpected type of target: {type(target)}"
                    )
            except (KeyError, AttributeError) as e:
                raise ApplyChangesException(
                    f"Failed to apply change for key '{key}': {e}"
                )
        else:
            if isinstance(target, dict):
                target[key] = value
            elif isinstance(target, BaseModel):
                setattr(target, key, value)
            else:
                raise ApplyChangesException(
                    f"Unexpected type of target: {type(target)}"
                )


def save_state(item: BaseModel):
    if hasattr(item, "_save_state"):
        item._save_state()  # type: ignore


def parse_obj(
    model: Union[Type[BaseModel], Type["Document"]],
    data: Any,
    lazy_parse: bool = False,
) -> BaseModel:
    if (
        hasattr(model, "get_model_type")
        and model.get_model_type() == ModelType.UnionDoc  # type: ignore
    ):
        if model._document_models is None:  # type: ignore
            raise UnionHasNoRegisteredDocs

        if isinstance(data, dict):
            class_name = data[model.get_settings().class_id]  # type: ignore
        else:
            class_name = data._class_id

        if class_name not in model._document_models:  # type: ignore
            raise DocWasNotRegisteredInUnionClass
        return parse_obj(
            model=model._document_models[class_name],  # type: ignore
            data=data,
            lazy_parse=lazy_parse,
        )  # type: ignore
    if (
        hasattr(model, "get_model_type")
        and model.get_model_type() == ModelType.Document  # type: ignore
        and model._inheritance_inited  # type: ignore
    ):
        if isinstance(data, dict):
            class_name = data.get(model.get_settings().class_id)  # type: ignore
        elif hasattr(data, model.get_settings().class_id):  # type: ignore
            class_name = data._class_id
        else:
            class_name = None

        if model._children and class_name in model._children:  # type: ignore
            return parse_obj(
                model=model._children[class_name],  # type: ignore
                data=data,
                lazy_parse=lazy_parse,
            )  # type: ignore

    if (
        lazy_parse
        and hasattr(model, "get_model_type")
        and model.get_model_type() == ModelType.Document  # type: ignore
    ):
        o = model.lazy_parse(data, {"_id"})  # type: ignore
        o._saved_state = {"_id": o.id}
        return o
    result = parse_model(model, data)
    save_state(result)
    return result