File: test_timesries_collection.py

package info (click to toggle)
python-beanie 2.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,480 kB
  • sloc: python: 14,427; makefile: 7; sh: 6
file content (41 lines) | stat: -rw-r--r-- 1,282 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import pytest

from beanie import init_beanie
from beanie.exceptions import MongoDBVersionError
from tests.odm.models import DocumentWithTimeseries


async def test_timeseries_collection(db):
    build_info = await db.command({"buildInfo": 1})
    mongo_version = build_info["version"]
    major_version = int(mongo_version.split(".")[0])
    if major_version < 5:
        with pytest.raises(MongoDBVersionError):
            await init_beanie(
                database=db, document_models=[DocumentWithTimeseries]
            )

    if major_version >= 5:
        await init_beanie(
            database=db, document_models=[DocumentWithTimeseries]
        )
        info = await db.command(
            {
                "listCollections": 1,
                "filter": {"name": "DocumentWithTimeseries"},
            }
        )

        assert info["cursor"]["firstBatch"][0] == {
            "name": "DocumentWithTimeseries",
            "type": "timeseries",
            "options": {
                "expireAfterSeconds": 2,
                "timeseries": {
                    "timeField": "ts",
                    "granularity": "seconds",
                    "bucketMaxSpanSeconds": 3600,
                },
            },
            "info": {"readOnly": False},
        }