File: test_incomplete_futures.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (66 lines) | stat: -rw-r--r-- 2,005 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import logging
import os
import random
from concurrent.futures import Future

import pytest

import parsl


@parsl.python_app
def this_app(inputs=()):
    return inputs[0]


@pytest.mark.local
def test_future_representation(tmpd_cwd):
    import sqlalchemy
    from sqlalchemy import text

    from parsl.tests.configs.htex_local_alternate import fresh_config

    monitoring_db = str(tmpd_cwd / "monitoring.db")
    monitoring_url = "sqlite:///" + monitoring_db

    c = fresh_config()
    c.monitoring.logging_endpoint = monitoring_url
    c.run_dir = tmpd_cwd

    parsl.load(c)

    # we're going to pass this TOKEN into an app via a pre-requisite Future,
    # and then expect to see it appear in the monitoring database.
    TOKEN = random.randint(0, 1000000)

    # make a Future that has no result yet
    # invoke a task that depends on it
    # inspect and insert something about the monitoring recorded value of that Future
    # make the Future complete
    # inspect and insert something about the monitoring recorded value of that Future

    f1 = Future()

    f2 = this_app(inputs=[f1])

    f1.set_result(TOKEN)

    assert f2.result() == TOKEN

    # this cleanup gives a barrier that allows the monitoring code to store
    # everything it has in the database - without this, there's a race
    # condition that the task will not have arrived in the database yet.
    # A different approach for this test might be to poll the DB for a few
    # seconds, with the assumption "data will arrive in the DB within
    # 30 seconds, but probably much sooner".
    parsl.dfk().cleanup()

    engine = sqlalchemy.create_engine(monitoring_url)
    with engine.begin() as connection:
        result = connection.execute(text("SELECT COUNT(*) FROM task"))
        (task_count, ) = result.first()
        assert task_count == 1

        result = connection.execute(text("SELECT task_inputs FROM task"))
        (task_inputs, ) = result.first()
        assert task_inputs == "[" + repr(TOKEN) + "]"