File: lifespan.py

package info (click to toggle)
ormar 0.22.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,952 kB
  • sloc: python: 24,085; makefile: 34; sh: 14
file content (71 lines) | stat: -rw-r--r-- 2,305 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from contextlib import asynccontextmanager
from typing import AsyncIterator

import pytest_asyncio
import sqlalchemy
from fastapi import FastAPI
from ormar import OrmarConfig
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine

from tests.settings import ASYNC_DATABASE_URL


def lifespan(config):
    @asynccontextmanager
    async def do_lifespan(_: FastAPI) -> AsyncIterator[None]:
        if not config.database.is_connected:
            await config.database.connect()

        yield

        if config.database.is_connected:
            await config.database.disconnect()

    return do_lifespan


def drop_tables(
    connection: sqlalchemy.Connection, config: OrmarConfig
):  # pragma: no cover
    if connection.dialect.name == "postgresql":
        for table in reversed(config.metadata.sorted_tables):
            connection.execute(text(f'DROP TABLE IF EXISTS "{table.name}" CASCADE'))
    else:
        config.metadata.drop_all(connection)


def init_tests(config, scope="module"):
    @pytest_asyncio.fixture(autouse=True, scope=scope)
    async def create_database():

        # Drop and create tables in a single connection to avoid event loop issues
        async with config.engine.begin() as conn:

            def setup_tables(connection):  # pragma: no cover
                drop_tables(connection, config)
                config.metadata.create_all(connection)

            await conn.run_sync(setup_tables)

        # For PostgreSQL and MySQL, recreate engine to avoid event loop conflicts
        # asyncpg and aiomysql are strict about event loops
        if config.engine.dialect.name in ("postgresql", "mysql"):  # pragma: no cover
            await config.engine.dispose()
            config._original_engine = config.engine
            config.engine = create_async_engine(ASYNC_DATABASE_URL)

        yield

        # Restore the original engine if it was swapped
        if hasattr(config, "_original_engine"):  # pragma: no cover
            await config.engine.dispose()
            config.engine = config._original_engine
            delattr(config, "_original_engine")

        async with config.engine.begin() as conn:
            await conn.run_sync(drop_tables, config)

        await config.engine.dispose()

    return create_database