File: test_memoization_representation.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (83 lines) | stat: -rw-r--r-- 2,645 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83

import logging
import os

import pytest

import parsl

logger = logging.getLogger(__name__)


@parsl.python_app(cache=True)
def this_app(x):
    return x + 1


@pytest.mark.local
def test_hashsum():
    import sqlalchemy
    from sqlalchemy import text

    from parsl.tests.configs.htex_local_alternate import fresh_config

    if os.path.exists("runinfo/monitoring.db"):
        logger.info("Monitoring database already exists - deleting")
        os.remove("runinfo/monitoring.db")

    logger.info("loading parsl")
    parsl.load(fresh_config())

    logger.info("invoking and waiting for result (1/4)")
    f1 = this_app(4)
    assert f1.result() == 5

    logger.info("invoking and waiting for result (2/4)")
    f2 = this_app(17)
    assert f2.result() == 18

    logger.info("invoking and waiting for result (3/4)")
    f3 = this_app(4)
    assert f3.result() == 5

    logger.info("invoking and waiting for result (4/4)")
    f4 = this_app(4)
    assert f4.result() == 5

    assert f1.task_record['hashsum'] == f3.task_record['hashsum']
    assert f1.task_record['hashsum'] == f4.task_record['hashsum']
    assert f1.task_record['hashsum'] != f2.task_record['hashsum']

    logger.info("cleaning up parsl")
    parsl.dfk().cleanup()

    # at this point, we should find one row in the monitoring database.

    logger.info("checking database content")
    engine = sqlalchemy.create_engine("sqlite:///runinfo/monitoring.db")
    with engine.begin() as connection:

        # we should have three tasks, but with only two tries, because the
        # memo try should be missing
        result = connection.execute(text("SELECT COUNT(*) FROM task"))
        (task_count, ) = result.first()
        assert task_count == 4

        # this will check that the number of task rows for each hashsum matches the above app invocations
        result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f1.task_record['hashsum']}'"))
        (hashsum_count, ) = result.first()
        assert hashsum_count == 3

        result = connection.execute(text(f"SELECT COUNT(task_hashsum) FROM task WHERE task_hashsum='{f2.task_record['hashsum']}'"))
        (hashsum_count, ) = result.first()
        assert hashsum_count == 1

        result = connection.execute(text("SELECT COUNT(*) FROM status WHERE task_status_name='exec_done'"))
        (memo_count, ) = result.first()
        assert memo_count == 2

        result = connection.execute(text("SELECT COUNT(*) FROM status WHERE task_status_name='memo_done'"))
        (memo_count, ) = result.first()
        assert memo_count == 2

    logger.info("all done")