File: test_drain.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (79 lines) | stat: -rw-r--r-- 2,370 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import time

import pytest

import parsl
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.launchers import SimpleLauncher
from parsl.providers import LocalProvider

# this constant is used to scale some durations that happen
# based around the expected drain period: the drain period
# is TIME_CONST seconds, and the single executed task will
# last twice that many number of seconds.
TIME_CONST = 4

CONNECTED_MANAGERS_POLL_MS = 100


def local_config():
    return Config(
        executors=[
            HighThroughputExecutor(
                label="htex_local",
                drain_period=TIME_CONST,
                worker_debug=True,
                cores_per_worker=1,
                encrypted=True,
                provider=LocalProvider(
                    init_blocks=1,
                    min_blocks=0,
                    max_blocks=0,
                    launcher=SimpleLauncher(),
                ),
            )
        ],
        strategy='none',
        strategy_period=0.1
    )


@parsl.python_app
def f(n):
    import time
    time.sleep(n)


@pytest.mark.local
def test_drain(try_assert):

    htex = parsl.dfk().executors['htex_local']

    # wait till we have a block running...

    try_assert(lambda: len(htex.connected_managers()) == 1, check_period_ms=CONNECTED_MANAGERS_POLL_MS)

    managers = htex.connected_managers()
    assert managers[0]['active'], "The manager should be active"
    assert not managers[0]['draining'], "The manager should not be draining"

    fut = f(TIME_CONST * 2)

    time.sleep(TIME_CONST)

    # this assert should happen *very fast* after the above delay...
    try_assert(lambda: htex.connected_managers()[0]['draining'], timeout_ms=500, check_period_ms=CONNECTED_MANAGERS_POLL_MS)

    # and the test task should still be running...
    assert not fut.done(), "The test task should still be running"

    fut.result()

    # and now we should see the manager disappear...
    # ... with strategy='none', this should be coming from draining but
    # that information isn't immediately obvious from the absence in
    # connected managers.
    # As with the above draining assert, this should happen very fast after
    # the task ends.
    try_assert(lambda: len(htex.connected_managers()) == 0, timeout_ms=500, check_period_ms=CONNECTED_MANAGERS_POLL_MS)