File: test_basic.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (133 lines) | stat: -rw-r--r-- 4,592 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
import time

import pytest

import parsl
from parsl import HighThroughputExecutor
from parsl.config import Config
from parsl.executors.taskvine import TaskVineExecutor, TaskVineManagerConfig
from parsl.monitoring import MonitoringHub


@parsl.python_app
def this_app():
    # this delay needs to be several times the resource monitoring
    # period configured in the test configuration, so that some
    # messages are actually sent - there is no guarantee that any
    # (non-first) resource message will be sent at all for a short app.
    time.sleep(3)

    return 5


# The below fresh configs are for use in parametrization, and should return
# a configuration that is suitably configured for monitoring.

def htex_config():
    """This config will use htex's default htex-specific monitoring radio mode"""
    from parsl.tests.configs.htex_local_alternate import fresh_config
    return fresh_config()


def htex_udp_config():
    """This config will force UDP"""
    from parsl.tests.configs.htex_local_alternate import fresh_config
    c = fresh_config()
    assert len(c.executors) == 1

    assert c.executors[0].radio_mode == "htex", "precondition: htex has a radio mode attribute, configured for htex radio"
    c.executors[0].radio_mode = "udp"

    return c


def htex_filesystem_config():
    """This config will force filesystem radio"""
    from parsl.tests.configs.htex_local_alternate import fresh_config
    c = fresh_config()
    assert len(c.executors) == 1

    assert c.executors[0].radio_mode == "htex", "precondition: htex has a radio mode attribute, configured for htex radio"
    c.executors[0].radio_mode = "filesystem"

    return c


def workqueue_config():
    from parsl.tests.configs.workqueue_ex import fresh_config
    c = fresh_config()
    c.monitoring = MonitoringHub(
                        hub_address="localhost",
                        resource_monitoring_interval=1)
    return c


def taskvine_config():
    c = Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000),
                                           worker_launch_method='provider')],

               monitoring=MonitoringHub(hub_address="localhost",
                                        resource_monitoring_interval=1))
    return c


@pytest.mark.local
@pytest.mark.parametrize("fresh_config", [htex_config, htex_filesystem_config, htex_udp_config, workqueue_config, taskvine_config])
def test_row_counts(tmpd_cwd, fresh_config):
    # this is imported here rather than at module level because
    # it isn't available in a plain parsl install, so this module
    # would otherwise fail to import and break even a basic test
    # run.
    import sqlalchemy
    from sqlalchemy import text

    db_url = f"sqlite:///{tmpd_cwd}/monitoring.db"

    config = fresh_config()
    config.run_dir = tmpd_cwd
    config.monitoring.logging_endpoint = db_url

    with parsl.load(config):
        assert this_app().result() == 5

    # at this point, we should find one row in the monitoring database.

    engine = sqlalchemy.create_engine(db_url)
    with engine.begin() as connection:

        result = connection.execute(text("SELECT COUNT(*) FROM workflow"))
        (c, ) = result.first()
        assert c == 1

        result = connection.execute(text("SELECT COUNT(*) FROM task"))
        (c, ) = result.first()
        assert c == 1

        result = connection.execute(text("SELECT COUNT(*) FROM try"))
        (c, ) = result.first()
        assert c == 1

        result = connection.execute(text("SELECT COUNT(*) FROM status, try "
                                         "WHERE status.task_id = try.task_id "
                                         "AND status.task_status_name='exec_done' "
                                         "AND task_try_time_running is NULL"))
        (c, ) = result.first()
        assert c == 0

        if isinstance(config.executors[0], HighThroughputExecutor):
            # The node table is specific to the HighThroughputExecutor
            # Two entries: one showing manager active, one inactive
            result = connection.execute(text("SELECT COUNT(*) FROM node"))
            (c, ) = result.first()
            assert c == 2

        # There should be one block polling status
        # local provider has a status_polling_interval of 5s
        result = connection.execute(text("SELECT COUNT(*) FROM block"))
        (c, ) = result.first()
        assert c >= 2

        result = connection.execute(text("SELECT COUNT(*) FROM resource"))
        (c, ) = result.first()
        assert c >= 1