1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
from abc import abstractmethod
from typing import (
Any,
Dict,
Generic,
List,
Optional,
Type,
TypeVar,
cast,
)
from pydantic.main import BaseModel
from beanie.odm.utils.parsing import parse_obj
CursorResultType = TypeVar("CursorResultType")
class BaseCursorQuery(Generic[CursorResultType]):
"""
BaseCursorQuery class. Wrapper over AsyncCursor,
which parse result with model
"""
cursor = None
lazy_parse = False
@abstractmethod
def get_projection_model(self) -> Optional[Type[BaseModel]]: ...
@abstractmethod
async def get_cursor(self): ...
def _cursor_params(self): ...
def __aiter__(self):
return self
async def __anext__(self) -> CursorResultType:
if self.cursor is None:
self.cursor = await self.get_cursor()
next_item = await self.cursor.__anext__()
projection = self.get_projection_model()
if projection is None:
return next_item
return parse_obj(projection, next_item, lazy_parse=self.lazy_parse) # type: ignore
@abstractmethod
def _get_cache(self) -> List[Dict[str, Any]]: ...
@abstractmethod
def _set_cache(self, data): ...
async def to_list(
self, length: Optional[int] = None
) -> List[CursorResultType]: # noqa
"""
Get list of documents
:param length: Optional[int] - length of the list
:return: Union[List[BaseModel], List[Dict[str, Any]]]
"""
cursor = await self.get_cursor()
pymongo_list: List[Dict[str, Any]] = self._get_cache()
if pymongo_list is None:
pymongo_list = await cursor.to_list(length)
self._set_cache(pymongo_list)
projection = self.get_projection_model()
if projection is not None:
return cast(
List[CursorResultType],
[
parse_obj(projection, i, lazy_parse=self.lazy_parse)
for i in pymongo_list
],
)
return cast(List[CursorResultType], pymongo_list)
|