File: test_manager_selector_by_block.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (51 lines) | stat: -rw-r--r-- 1,310 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import time

import pytest

import parsl
from parsl.app.app import bash_app, python_app
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.executors.high_throughput.manager_selector import (
    BlockIdManagerSelector,
    ManagerSelector,
)
from parsl.launchers import WrappedLauncher
from parsl.providers import LocalProvider
from parsl.usage_tracking.levels import LEVEL_1

BLOCK_COUNT = 2


@parsl.python_app
def get_worker_pid():
    import os
    return os.environ.get('PARSL_WORKER_BLOCK_ID')


@pytest.mark.local
def test_block_id_selection(try_assert):
    htex = HighThroughputExecutor(
        label="htex_local",
        max_workers_per_node=1,
        manager_selector=BlockIdManagerSelector(),
        provider=LocalProvider(
            init_blocks=BLOCK_COUNT,
            max_blocks=BLOCK_COUNT,
            min_blocks=BLOCK_COUNT,
        ),
    )

    config = Config(
        executors=[htex],
        usage_tracking=LEVEL_1,
    )

    with parsl.load(config):
        blockids = []
        try_assert(lambda: len(htex.connected_managers()) == BLOCK_COUNT, timeout_ms=20000)
        for i in range(10):
            future = get_worker_pid()
            blockids.append(future.result())

        assert all(blockid == "1" for blockid in blockids)