File: t_install.py

package info (click to toggle)
python-pyomop 4.3.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 536 kB
  • sloc: python: 1,374; sh: 37; makefile: 26
file content (59 lines) | stat: -rw-r--r-- 1,821 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from pyomop import CdmEngineFactory, CdmVocabulary, CdmVector, Cohort, Vocabulary, metadata
from sqlalchemy.future import select
import datetime
import asyncio

async def main():
    cdm = CdmEngineFactory()  # Creates SQLite database by default
    # Postgres example (db='mysql' also supported)
    # cdm = CdmEngineFactory(db='pgsql', host='', port=5432,
    #                       user='', pw='',
    #                       name='', schema='cdm6')

    engine = cdm.engine
    # Create Tables if required
    await cdm.init_models(metadata)
    # Create vocabulary if required
    vocab = CdmVocabulary(cdm)
    # vocab.create_vocab('/path/to/csv/files')  # Uncomment to load vocabulary csv files

    # Add a cohort
    async with cdm.session() as session:
        async with session.begin():
            session.add(Cohort(cohort_definition_id=2, subject_id=100,
                cohort_end_date=datetime.datetime.now(),
                cohort_start_date=datetime.datetime.now()))
        await session.commit()

    # Query the cohort
    stmt = select(Cohort).where(Cohort.subject_id == 100)
    result = await session.execute(stmt)
    for row in result.scalars():
        print(row)
        assert row.subject_id == 100

    # Query the cohort pattern 2
    cohort = await session.get(Cohort, 1)
    print(cohort)
    assert cohort.subject_id == 100

    # Convert result to a pandas dataframe
    vec = CdmVector()
    vec.result = result
    print(vec.df.dtypes)

    result = await vec.sql_df(cdm, 'TEST') # TEST is defined in sqldict.py
    for row in result:
        print(row)

    result = await vec.sql_df(cdm, query='SELECT * from cohort')
    for row in result:
        print(row)


    # Close session
    await session.close()
    await engine.dispose()

# Run the main function
asyncio.run(main())