1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
|
import datetime
from decimal import Decimal
from pathlib import Path
from typing import AbstractSet, Mapping
from uuid import uuid4
import pytest
from pydantic import BaseModel, ValidationError
from beanie import Document
from beanie.exceptions import CollectionWasNotInitialized
from beanie.odm.fields import PydanticObjectId
from beanie.odm.utils.dump import get_dict
from beanie.odm.utils.encoder import Encoder
from beanie.odm.utils.pydantic import IS_PYDANTIC_V2
from tests.odm.models import (
DocumentTestModel,
DocumentTestModelIndexFlagsAnnotated,
DocumentWithBsonEncodersFiledsTypes,
DocumentWithCustomFiledsTypes,
DocumentWithDeprecatedHiddenField,
Sample,
)
class M(BaseModel):
p: PydanticObjectId
def test_pydantic_object_id_wrong_input():
with pytest.raises(ValidationError):
M(p="test")
def test_pydantic_object_id_bytes_input():
p = PydanticObjectId()
m = M(p=str(p).encode("utf-8"))
assert m.p == p
with pytest.raises(ValidationError):
M(p=b"test")
async def test_bson_encoders_filed_types():
custom = DocumentWithBsonEncodersFiledsTypes(
color="7fffd4",
timestamp=datetime.datetime.now(tz=datetime.timezone.utc),
)
encoded = get_dict(custom)
assert isinstance(encoded["timestamp"], str)
c = await custom.insert()
c_fromdb = await DocumentWithBsonEncodersFiledsTypes.get(c.id)
assert c_fromdb.color.as_hex() == c.color.as_hex()
assert isinstance(c_fromdb.timestamp, datetime.datetime)
assert c_fromdb.timestamp, custom.timestamp
async def test_custom_filed_types():
custom1 = DocumentWithCustomFiledsTypes(
color="#753c38",
decimal=500,
secret_bytes=b"secret_bytes",
secret_string="super_secret_password",
ipv4address="127.0.0.1",
ipv4interface="192.0.2.5/24",
ipv4network="192.0.2.0/24",
ipv6address="::abc:7:def",
ipv6interface="2001:db00::2/24",
ipv6network="2001:db00::0/24",
timedelta=4782453,
set_type={"one", "two", "three"},
tuple_type=tuple([3, "string"]),
path="/etc/hosts",
)
custom2 = DocumentWithCustomFiledsTypes(
color="magenta",
decimal=Decimal("3.14") + Decimal(10) ** Decimal(-18),
secret_bytes=b"secret_bytes",
secret_string="super_secret_password",
ipv4address="127.0.0.1",
ipv4interface="192.0.2.5/24",
ipv4network="192.0.2.0/24",
ipv6address="::abc:7:def",
ipv6interface="2001:db00::2/24",
ipv6network="2001:db00::0/24",
timedelta=4782453,
set_type=["one", "two", "three"],
tuple_type=[3, "three"],
path=Path("C:\\Windows"),
)
c1 = await custom1.insert()
c2 = await custom2.insert()
c1_fromdb = await DocumentWithCustomFiledsTypes.get(c1.id)
c2_fromdb = await DocumentWithCustomFiledsTypes.get(c2.id)
assert set(c1_fromdb.set_type) == set(c1.set_type)
assert set(c2_fromdb.set_type) == set(c2.set_type)
c1_fromdb.set_type = c2_fromdb.set_type = c1.set_type = c2.set_type = None
c1_fromdb.revision_id = None
c2_fromdb.revision_id = None
c1_encoded = Encoder().encode(c1)
c1_fromdb_encoded = Encoder().encode(c1_fromdb)
c2_encoded = Encoder().encode(c2)
c2_fromdb_encoded = Encoder().encode(c2_fromdb)
assert c1_fromdb_encoded == c1_encoded
assert c2_fromdb_encoded == c2_encoded
assert Decimal(str(custom1.decimal)) == Decimal(
str(c1_encoded.get("decimal"))
)
assert Decimal(str(custom2.decimal)) == Decimal(
str(c2_encoded.get("decimal"))
)
async def test_excluded(document):
document = await DocumentTestModel.find_one()
if IS_PYDANTIC_V2:
assert "test_list" not in document.model_dump()
else:
assert "test_list" not in document.dict()
async def test_hidden(deprecated_init_beanie):
document = DocumentWithDeprecatedHiddenField(test_hidden=["abc", "def"])
await document.insert()
document = await DocumentWithDeprecatedHiddenField.find_one()
if IS_PYDANTIC_V2:
assert "test_hidden" not in document.model_dump()
else:
assert "test_hidden" not in document.dict()
def test_revision_id_not_in_schema():
"""Check if there is a `revision_id` slipping into the schema."""
class Foo(Document):
"""Dummy document."""
bar: int = 3
if IS_PYDANTIC_V2:
schema = Foo.model_json_schema()
else:
schema = Foo.schema()
assert "revision_id" not in schema["properties"]
# check that the document has not been initialized,
# as otherwise the `revision_id` is normally gone from the schema.
with pytest.raises(CollectionWasNotInitialized):
Foo.get_settings()
@pytest.mark.parametrize("exclude", [{"test_int"}, {"test_doc": {"test_int"}}])
async def test_param_exclude(document, exclude):
document = await DocumentTestModel.find_one()
if IS_PYDANTIC_V2:
doc_dict = document.model_dump(exclude=exclude)
else:
doc_dict = document.dict(exclude=exclude)
if isinstance(exclude, AbstractSet):
for k in exclude:
assert k not in doc_dict
elif isinstance(exclude, Mapping):
for k, v in exclude.items():
if isinstance(v, bool) and v:
assert k not in doc_dict
elif isinstance(v, AbstractSet):
for another_k in v:
assert another_k not in doc_dict[k]
def test_expression_fields():
assert Sample.nested.integer == "nested.integer"
assert Sample.nested["integer"] == "nested.integer"
def test_indexed_field() -> None:
"""Test that fields can be declared and instantiated with Indexed()
and Annotated[..., Indexed()]."""
# No error should be raised the document is properly initialized
# and `Indexed` is implemented correctly.
DocumentTestModelIndexFlagsAnnotated(
str_index="test",
str_index_annotated="test",
uuid_index=uuid4(),
uuid_index_annotated=uuid4(),
)
|