1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
import argparse
import pickle
import pytest
import parsl
from parsl.app.app import python_app
from parsl.tests.configs.local_threads_checkpoint_task_exit import config
from parsl.utils import time_limited_open
def local_setup():
global dfk
dfk = parsl.load(config)
def local_teardown():
parsl.dfk().cleanup()
@python_app(cache=True)
def slow_double(x, sleep_dur=1):
import time
time.sleep(sleep_dur)
return x * 2
@pytest.mark.local
def test_at_task_exit(n=2):
"""Test checkpointing at task_exit behavior
"""
d = {}
print("Launching: ", n)
for i in range(0, n):
d[i] = slow_double(i)
print("Done launching")
for i in range(0, n):
d[i].result()
# There are two potential race conditions here which
# might be useful to be aware of if debugging this test.
# i) .result() returning does not necessarily mean that
# a checkpoint that has been written: it means that the
# AppFuture has had its result written. In the DFK
# implementation at time of writing, .result() returning
# does not indicate that a checkpoint has been written,
# it seems like.
# ii) time_limited_open has a specific time limit in it.
# While this limit might seem generous at time of writing,
# it should be remembered that this is still a race.
with time_limited_open("{}/checkpoint/tasks.pkl".format(dfk.run_dir), 'rb', seconds=5) as f:
tasks = []
try:
while f:
tasks.append(pickle.load(f))
except EOFError:
pass
assert len(tasks) == n, "Expected {} checkpoint events, got {}".format(n, len(tasks))
|