1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
|
import pytest
from odmantic import AIOEngine
from .models import VALID_LEVELS, SmallJournal
pytestmark = [
pytest.mark.asyncio,
pytest.mark.skip("@benchmark does not support async functions yet"),
]
@pytest.fixture(params=[10, 50, 100])
def count(request):
return request.param
async def test_insert_small_single(benchmark, aio_engine: AIOEngine, count: int):
instances = list(SmallJournal.get_random_instances("test_write_small", count))
@benchmark
async def _():
for instance in instances:
await aio_engine.save(instance)
async def test_write_small_bulk(
benchmark,
aio_engine: AIOEngine,
count: int,
):
instances = list(SmallJournal.get_random_instances("test_write_small", count))
@benchmark
async def _():
await aio_engine.save_all(instances)
async def test_filter_by_level_small(benchmark, aio_engine: AIOEngine, count: int):
instances = list(SmallJournal.get_random_instances("test_write_small", count))
await aio_engine.save_all(instances)
@benchmark
async def _():
total = 0
for level in VALID_LEVELS:
total += len(
await aio_engine.find(SmallJournal, SmallJournal.level == level)
)
async def test_filter_limit_skip_by_level_small(
benchmark, aio_engine: AIOEngine, count: int
):
instances = list(SmallJournal.get_random_instances("test_write_small", count))
await aio_engine.save_all(instances)
@benchmark
async def _():
total = 0
for level in VALID_LEVELS:
total += len(
await aio_engine.find(
SmallJournal, SmallJournal.level == level, limit=20, skip=20
)
)
async def test_find_one_by_id(benchmark, aio_engine: AIOEngine, count: int):
instances = list(SmallJournal.get_random_instances("test_write_small", count))
await aio_engine.save_all(instances)
ids = [instance.id for instance in instances]
@benchmark
async def _():
for id_ in ids:
await aio_engine.find_one(SmallJournal, SmallJournal.id == id_)
|