File: test_regression_239.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (84 lines) | stat: -rw-r--r-- 2,441 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import pickle

import pytest

from parsl import DataFlowKernel, python_app
from parsl.utils import time_limited_open


def run_checkpointed(n=2, mode="task_exit"):
    """ This test runs n apps that will fail with Division by zero error,
    followed by 1 app that will succeed. The checkpoint should only have 1 task.
    """

    from parsl.tests.configs.local_threads import config
    config["globals"]["checkpointMode"] = mode
    dfk = DataFlowKernel(config=config)

    @python_app(data_flow_kernel=dfk, cache=True)
    def cached_rand(x):
        import random
        return random.randint(0, 10000)

    @python_app(data_flow_kernel=dfk, cache=True)
    def cached_failing(x):
        5 / 0
        return 1

    items = []
    for i in range(0, n):
        x = cached_failing(0)
        items.append(x)
        try:
            x.result()
        except Exception:
            print("Ignoring failure of task")
            pass

    x = cached_rand(1)
    print(x.result())
    rundir = dfk.rundir
    # Call cleanup *only* for dfk_exit to ensure that a checkpoint is written
    # at all
    if mode == "dfk_exit":
        dfk.cleanup()
    return rundir


@pytest.mark.local
@pytest.mark.skip('hangs intermittently in pytest')
def test_regression_239():
    """Ensure failed tasks are not cached with task_exit mode. Tests #239
    Also tests task_exit behavior.
    """

    rundir = run_checkpointed()
    with time_limited_open("{}/checkpoint/tasks.pkl".format(rundir), 'rb', seconds=2) as f:
        tasks = []
        try:
            while f:
                tasks.append(pickle.load(f))
                print
        except EOFError:
            pass
        print("Tasks from cache : ", tasks)
        assert len(tasks) == 1, "Expected {} checkpoint events, got {}".format(1, len(tasks))


@pytest.mark.local
@pytest.mark.skip('hangs intermittently in pytest')
def test_checkpointing_at_dfk_exit():
    """Ensure failed tasks are not cached with dfk_exit mode. Tests #239
    """

    rundir = run_checkpointed(mode="dfk_exit")
    with time_limited_open("{}/checkpoint/tasks.pkl".format(rundir), 'rb', seconds=2) as f:
        tasks = []
        try:
            while f:
                tasks.append(pickle.load(f))
                print
        except EOFError:
            pass
        print("Tasks from cache : ", tasks)
        assert len(tasks) == 1, "Expected {} checkpoint events, got {}".format(1, len(tasks))