File: cursor.py

package info (click to toggle)
python-beanie 2.0.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,484 kB
  • sloc: python: 14,427; makefile: 6; sh: 6
file content (79 lines) | stat: -rw-r--r-- 2,110 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from abc import abstractmethod
from typing import (
    Any,
    Dict,
    Generic,
    List,
    Optional,
    Type,
    TypeVar,
    cast,
)

from pydantic.main import BaseModel

from beanie.odm.utils.parsing import parse_obj

CursorResultType = TypeVar("CursorResultType")


class BaseCursorQuery(Generic[CursorResultType]):
    """
    BaseCursorQuery class. Wrapper over AsyncCursor,
    which parse result with model
    """

    cursor = None
    lazy_parse = False

    @abstractmethod
    def get_projection_model(self) -> Optional[Type[BaseModel]]: ...

    @abstractmethod
    async def get_cursor(self): ...

    def _cursor_params(self): ...

    def __aiter__(self):
        return self

    async def __anext__(self) -> CursorResultType:
        if self.cursor is None:
            self.cursor = await self.get_cursor()
        next_item = await self.cursor.__anext__()
        projection = self.get_projection_model()
        if projection is None:
            return next_item
        return parse_obj(projection, next_item, lazy_parse=self.lazy_parse)  # type: ignore

    @abstractmethod
    def _get_cache(self) -> List[Dict[str, Any]]: ...

    @abstractmethod
    def _set_cache(self, data): ...

    async def to_list(
        self, length: Optional[int] = None
    ) -> List[CursorResultType]:  # noqa
        """
        Get list of documents

        :param length: Optional[int] - length of the list
        :return: Union[List[BaseModel], List[Dict[str, Any]]]
        """
        cursor = await self.get_cursor()
        pymongo_list: List[Dict[str, Any]] = self._get_cache()

        if pymongo_list is None:
            pymongo_list = await cursor.to_list(length)
            self._set_cache(pymongo_list)
        projection = self.get_projection_model()
        if projection is not None:
            return cast(
                List[CursorResultType],
                [
                    parse_obj(projection, i, lazy_parse=self.lazy_parse)
                    for i in pymongo_list
                ],
            )
        return cast(List[CursorResultType], pymongo_list)