File: async_stuff.py

package info (click to toggle)
sqlalchemy 2.0.45%2Bds1-1
  • links: PTS
  • area: main
  • in suites: forky, sid
  • size: 26,868 kB
  • sloc: python: 416,938; makefile: 231; sh: 7
file content (39 lines) | stat: -rw-r--r-- 1,166 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from asyncio import current_task

from sqlalchemy import text
from sqlalchemy.ext.asyncio import async_scoped_session
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine


engine = create_async_engine("")
SM = async_sessionmaker(engine, class_=AsyncSession)

async_session = AsyncSession(engine)

as_session = async_scoped_session(SM, current_task)


async def go() -> None:
    r = await async_session.scalars(text("select 1"), params=[])
    r.first()
    sr = await async_session.stream_scalars(text("select 1"), params=[])
    await sr.all()
    r = await as_session.scalars(text("select 1"), params=[])
    r.first()
    sr = await as_session.stream_scalars(text("select 1"), params=[])
    await sr.all()

    async with engine.connect() as conn:
        cr = await conn.scalars(text("select 1"))
        cr.first()
        scr = await conn.stream_scalars(text("select 1"))
        await scr.all()

    ast = async_session.get_transaction()
    if ast:
        ast.is_active
    nt = async_session.get_nested_transaction()
    if nt:
        nt.is_active