File: test_regression_233.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (75 lines) | stat: -rw-r--r-- 1,774 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import pytest

from parsl.app.app import python_app
from parsl.dataflow.dflow import DataFlowKernel


def run_checkpointed(checkpoints):
    # set_stream_logger()
    from parsl.tests.configs.local_threads_checkpoint_task_exit import config
    config.checkpoint_files = checkpoints
    dfk = DataFlowKernel(config=config)

    @python_app(data_flow_kernel=dfk, cache=True)
    def cached_rand(x):
        import random
        return random.randint(0, 10000)

    items = []
    for i in range(0, 5):
        x = cached_rand(i)
        items.append(x)

    # wait for all of the results
    results = [i.result() for i in items]

    dfk.cleanup()
    return results, dfk.run_dir


def run_race(sleep_dur):

    from parsl.tests.configs.local_threads_checkpoint_dfk_exit import config
    dfk = DataFlowKernel(config=config)

    @python_app(data_flow_kernel=dfk, cache=True)
    def cached_rand(x, sleep_dur=0):
        import random
        import time
        time.sleep(sleep_dur)
        return random.randint(0, 10000)

    items = []
    for i in range(0, 5):
        x = cached_rand(i, sleep_dur=sleep_dur)
        items.append(x)

    dfk.cleanup()
    return [i.result for i in items]


@pytest.mark.local
def test_regress_234():
    """Test task_exit checkpointing with fast tasks"""
    run_race(0)


@pytest.mark.local
def test_slower_apps():
    """Test task_exit tests with slow apps"""
    run_race(0.5)


@pytest.mark.local
def test_checkpoint_availability():
    import os

    original, run_dir = run_checkpointed([])
    last_checkpoint = os.path.join(run_dir, 'checkpoint')
    print(last_checkpoint)
    cached, _ = run_checkpointed([last_checkpoint])

    print(cached)
    print(original)

    assert cached == original, "All tasks were not cached"