File: test_multi_model.py

package info (click to toggle)
python-beanie 2.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,480 kB
  • sloc: python: 14,427; makefile: 7; sh: 6
file content (67 lines) | stat: -rw-r--r-- 2,374 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from tests.odm.models import (
    DocumentMultiModelOne,
    DocumentMultiModelTwo,
    DocumentUnion,
)


class TestMultiModel:
    async def test_multi_model(self):
        doc_1 = await DocumentMultiModelOne().insert()
        doc_2 = await DocumentMultiModelTwo().insert()

        new_doc_1 = await DocumentMultiModelOne.get(doc_1.id)
        new_doc_2 = await DocumentMultiModelTwo.get(doc_2.id)

        assert new_doc_1 is not None
        assert new_doc_2 is not None

        new_doc_1 = await DocumentMultiModelTwo.get(doc_1.id)
        new_doc_2 = await DocumentMultiModelOne.get(doc_2.id)

        assert new_doc_1 is None
        assert new_doc_2 is None

        new_docs_1 = await DocumentMultiModelOne.find({}).to_list()
        new_docs_2 = await DocumentMultiModelTwo.find({}).to_list()

        assert len(new_docs_1) == 1
        assert len(new_docs_2) == 1

        await DocumentMultiModelOne.update_all({"$set": {"shared": 100}})

        new_doc_1 = await DocumentMultiModelOne.get(doc_1.id)
        new_doc_2 = await DocumentMultiModelTwo.get(doc_2.id)

        assert new_doc_1.shared == 100
        assert new_doc_2.shared == 0

    async def test_union_doc(self):
        await DocumentMultiModelOne().insert()
        await DocumentMultiModelTwo().insert()
        await DocumentMultiModelOne().insert()
        await DocumentMultiModelTwo().insert()

        docs = await DocumentUnion.all().to_list()
        assert isinstance(docs[0], DocumentMultiModelOne)
        assert isinstance(docs[1], DocumentMultiModelTwo)
        assert isinstance(docs[2], DocumentMultiModelOne)
        assert isinstance(docs[3], DocumentMultiModelTwo)

    async def test_union_doc_aggregation(self):
        await DocumentMultiModelOne().insert()
        await DocumentMultiModelTwo().insert()
        await DocumentMultiModelOne().insert()
        await DocumentMultiModelTwo().insert()

        docs = await DocumentUnion.aggregate(
            [{"$match": {"$expr": {"$eq": ["$int_filed", 0]}}}]
        ).to_list()
        assert len(docs) == 2

    async def test_union_doc_link(self):
        doc_1 = await DocumentMultiModelOne().insert()
        await DocumentMultiModelTwo(linked_doc=doc_1).insert()

        docs = await DocumentMultiModelTwo.find({}, fetch_links=True).to_list()
        assert isinstance(docs[0].linked_doc, DocumentMultiModelOne)