File: test_stdouterr.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (134 lines) | stat: -rw-r--r-- 4,492 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""Tests monitoring records app name under various decoration patterns.
"""

import logging
import os
import re
import time
from typing import Union

import pytest

import parsl
from parsl.config import Config
from parsl.data_provider.data_manager import default_staging
from parsl.data_provider.files import File
from parsl.data_provider.staging import Staging
from parsl.executors import HighThroughputExecutor
from parsl.monitoring import MonitoringHub
from parsl.providers import LocalProvider


def fresh_config(run_dir):
    return Config(
        run_dir=str(run_dir),
        executors=[
            HighThroughputExecutor(
                address="127.0.0.1",
                label="htex_Local",
                provider=LocalProvider(
                    init_blocks=1,
                    min_blocks=1,
                    max_blocks=1,
                )
            )
        ],
        strategy='simple',
        strategy_period=0.1,
        monitoring=MonitoringHub(
                        hub_address="localhost",
        )
    )


@parsl.python_app
def stdapp(stdout=None, stderr=None):
    pass


class ArbitraryPathLike(os.PathLike):
    def __init__(self, path: Union[str, bytes]) -> None:
        self.path = path

    def __fspath__(self) -> Union[str, bytes]:
        return self.path


class ArbitraryStaging(Staging):
    """This staging provider will not actually do any staging, but will
    accept arbitrary: scheme URLs. That's enough for this monitoring test
    which doesn't need any actual stage out action to happen.
    """
    def can_stage_out(self, file):
        return file.scheme == "arbitrary"


@pytest.mark.local
@pytest.mark.parametrize('stdx,expected_stdx',
                         [('hello.txt', 'hello.txt'),
                          (None, ''),
                          (('tuple.txt', 'w'), 'tuple.txt'),
                          (ArbitraryPathLike('pl.txt'), 'pl.txt'),
                          (ArbitraryPathLike(b'pl2.txt'), 'pl2.txt'),
                          ((ArbitraryPathLike('pl3.txt'), 'w'), 'pl3.txt'),
                          ((ArbitraryPathLike(b'pl4.txt'), 'w'), 'pl4.txt'),
                          (parsl.AUTO_LOGNAME,
                              lambda p:
                              isinstance(p, str) and
                              os.path.isabs(p) and
                              re.match("^.*/task_0000_stdapp\\.std...$", p)),
                          (File("arbitrary:abc123"), "arbitrary:abc123"),
                          (File("file:///tmp/pl5"), "file:///tmp/pl5"),
                          ])
@pytest.mark.parametrize('stream', ['stdout', 'stderr'])
def test_stdstream_to_monitoring(stdx, expected_stdx, stream, tmpd_cwd, caplog):
    """This tests that various forms of stdout/err specification are
       represented in monitoring correctly. The stderr and stdout codepaths
       are generally duplicated, rather than factorised, and so this test
       runs the same tests on both stdout and stderr.
    """

    # this is imported here rather than at module level because
    # it isn't available in a plain parsl install, so this module
    # would otherwise fail to import and break even a basic test
    # run.
    import sqlalchemy

    c = fresh_config(tmpd_cwd)
    c.monitoring.logging_endpoint = f"sqlite:///{tmpd_cwd}/monitoring.db"
    c.executors[0].storage_access = default_staging + [ArbitraryStaging()]

    with parsl.load(c):
        kwargs = {stream: stdx}
        stdapp(**kwargs).result()

    engine = sqlalchemy.create_engine(c.monitoring.logging_endpoint)
    with engine.begin() as connection:

        def count_rows(table: str):
            result = connection.execute(sqlalchemy.text(f"SELECT COUNT(*) FROM {table}"))
            (c, ) = result.first()
            return c

        # one workflow...
        assert count_rows("workflow") == 1

        # ... with one task ...
        assert count_rows("task") == 1

        # ... that was tried once ...
        assert count_rows("try") == 1

        # ... and has the expected name.
        result = connection.execute(sqlalchemy.text(f"SELECT task_{stream} FROM task"))
        (c, ) = result.first()

        if isinstance(expected_stdx, str):
            assert c == expected_stdx
        elif callable(expected_stdx):
            assert expected_stdx(c)
        else:
            raise RuntimeError("Bad expected_stdx value")

    for record in caplog.records:
        assert record.levelno < logging.ERROR