File: test_fields.py

package info (click to toggle)
python-beanie 2.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,480 kB
  • sloc: python: 14,427; makefile: 7; sh: 6
file content (186 lines) | stat: -rw-r--r-- 6,082 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import datetime
from decimal import Decimal
from pathlib import Path
from typing import AbstractSet, Mapping
from uuid import uuid4

import pytest
from pydantic import BaseModel, ValidationError

from beanie import Document
from beanie.exceptions import CollectionWasNotInitialized
from beanie.odm.fields import PydanticObjectId
from beanie.odm.utils.dump import get_dict
from beanie.odm.utils.encoder import Encoder
from beanie.odm.utils.pydantic import IS_PYDANTIC_V2
from tests.odm.models import (
    DocumentTestModel,
    DocumentTestModelIndexFlagsAnnotated,
    DocumentWithBsonEncodersFiledsTypes,
    DocumentWithCustomFiledsTypes,
    DocumentWithDeprecatedHiddenField,
    Sample,
)


class M(BaseModel):
    p: PydanticObjectId


def test_pydantic_object_id_wrong_input():
    with pytest.raises(ValidationError):
        M(p="test")


def test_pydantic_object_id_bytes_input():
    p = PydanticObjectId()
    m = M(p=str(p).encode("utf-8"))
    assert m.p == p
    with pytest.raises(ValidationError):
        M(p=b"test")


async def test_bson_encoders_filed_types():
    custom = DocumentWithBsonEncodersFiledsTypes(
        color="7fffd4",
        timestamp=datetime.datetime.now(tz=datetime.timezone.utc),
    )
    encoded = get_dict(custom)
    assert isinstance(encoded["timestamp"], str)
    c = await custom.insert()
    c_fromdb = await DocumentWithBsonEncodersFiledsTypes.get(c.id)
    assert c_fromdb.color.as_hex() == c.color.as_hex()
    assert isinstance(c_fromdb.timestamp, datetime.datetime)
    assert c_fromdb.timestamp, custom.timestamp


async def test_custom_filed_types():
    custom1 = DocumentWithCustomFiledsTypes(
        color="#753c38",
        decimal=500,
        secret_bytes=b"secret_bytes",
        secret_string="super_secret_password",
        ipv4address="127.0.0.1",
        ipv4interface="192.0.2.5/24",
        ipv4network="192.0.2.0/24",
        ipv6address="::abc:7:def",
        ipv6interface="2001:db00::2/24",
        ipv6network="2001:db00::0/24",
        timedelta=4782453,
        set_type={"one", "two", "three"},
        tuple_type=tuple([3, "string"]),
        path="/etc/hosts",
    )
    custom2 = DocumentWithCustomFiledsTypes(
        color="magenta",
        decimal=Decimal("3.14") + Decimal(10) ** Decimal(-18),
        secret_bytes=b"secret_bytes",
        secret_string="super_secret_password",
        ipv4address="127.0.0.1",
        ipv4interface="192.0.2.5/24",
        ipv4network="192.0.2.0/24",
        ipv6address="::abc:7:def",
        ipv6interface="2001:db00::2/24",
        ipv6network="2001:db00::0/24",
        timedelta=4782453,
        set_type=["one", "two", "three"],
        tuple_type=[3, "three"],
        path=Path("C:\\Windows"),
    )
    c1 = await custom1.insert()
    c2 = await custom2.insert()
    c1_fromdb = await DocumentWithCustomFiledsTypes.get(c1.id)
    c2_fromdb = await DocumentWithCustomFiledsTypes.get(c2.id)
    assert set(c1_fromdb.set_type) == set(c1.set_type)
    assert set(c2_fromdb.set_type) == set(c2.set_type)
    c1_fromdb.set_type = c2_fromdb.set_type = c1.set_type = c2.set_type = None
    c1_fromdb.revision_id = None
    c2_fromdb.revision_id = None
    c1_encoded = Encoder().encode(c1)
    c1_fromdb_encoded = Encoder().encode(c1_fromdb)
    c2_encoded = Encoder().encode(c2)
    c2_fromdb_encoded = Encoder().encode(c2_fromdb)
    assert c1_fromdb_encoded == c1_encoded
    assert c2_fromdb_encoded == c2_encoded
    assert Decimal(str(custom1.decimal)) == Decimal(
        str(c1_encoded.get("decimal"))
    )
    assert Decimal(str(custom2.decimal)) == Decimal(
        str(c2_encoded.get("decimal"))
    )


async def test_excluded(document):
    document = await DocumentTestModel.find_one()
    if IS_PYDANTIC_V2:
        assert "test_list" not in document.model_dump()
    else:
        assert "test_list" not in document.dict()


async def test_hidden(deprecated_init_beanie):
    document = DocumentWithDeprecatedHiddenField(test_hidden=["abc", "def"])
    await document.insert()
    document = await DocumentWithDeprecatedHiddenField.find_one()
    if IS_PYDANTIC_V2:
        assert "test_hidden" not in document.model_dump()
    else:
        assert "test_hidden" not in document.dict()


def test_revision_id_not_in_schema():
    """Check if there is a `revision_id` slipping into the schema."""

    class Foo(Document):
        """Dummy document."""

        bar: int = 3

    if IS_PYDANTIC_V2:
        schema = Foo.model_json_schema()
    else:
        schema = Foo.schema()
    assert "revision_id" not in schema["properties"]

    # check that the document has not been initialized,
    # as otherwise the `revision_id` is normally gone from the schema.
    with pytest.raises(CollectionWasNotInitialized):
        Foo.get_settings()


@pytest.mark.parametrize("exclude", [{"test_int"}, {"test_doc": {"test_int"}}])
async def test_param_exclude(document, exclude):
    document = await DocumentTestModel.find_one()
    if IS_PYDANTIC_V2:
        doc_dict = document.model_dump(exclude=exclude)
    else:
        doc_dict = document.dict(exclude=exclude)
    if isinstance(exclude, AbstractSet):
        for k in exclude:
            assert k not in doc_dict
    elif isinstance(exclude, Mapping):
        for k, v in exclude.items():
            if isinstance(v, bool) and v:
                assert k not in doc_dict
            elif isinstance(v, AbstractSet):
                for another_k in v:
                    assert another_k not in doc_dict[k]


def test_expression_fields():
    assert Sample.nested.integer == "nested.integer"
    assert Sample.nested["integer"] == "nested.integer"


def test_indexed_field() -> None:
    """Test that fields can be declared and instantiated with Indexed()
    and Annotated[..., Indexed()]."""

    # No error should be raised the document is properly initialized
    # and `Indexed` is implemented correctly.
    DocumentTestModelIndexFlagsAnnotated(
        str_index="test",
        str_index_annotated="test",
        uuid_index=uuid4(),
        uuid_index_annotated=uuid4(),
    )