1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
import logging
import os
import pickle
from unittest import mock
import pytest
from parsl.executors.high_throughput.mpi_resource_management import (
MPITaskScheduler,
TaskScheduler,
)
from parsl.multiprocessing import SpawnContext
from parsl.serialize import pack_res_spec_apply_message, unpack_res_spec_apply_message
@pytest.fixture(autouse=True)
def set_pbs_nodefile_envvars():
cwd = os.path.abspath(os.path.dirname(__file__))
pbs_nodefile = os.path.join(cwd, "mocks", "pbs_nodefile.8")
with mock.patch.dict(os.environ, {"PBS_NODEFILE": pbs_nodefile}):
yield
@pytest.mark.local
def test_NoopScheduler():
task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
scheduler = TaskScheduler(task_q, result_q)
scheduler.put_task("TaskFoo")
assert task_q.get() == "TaskFoo"
result_q.put("Result1")
assert scheduler.get_result(True, 1) == "Result1"
@pytest.mark.local
def test_MPISched_put_task():
task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
scheduler = MPITaskScheduler(task_q, result_q)
assert scheduler.available_nodes
assert len(scheduler.available_nodes) == 8
assert scheduler._free_node_counter.value == 8
mock_task_buffer = pack_res_spec_apply_message("func",
"args",
"kwargs",
resource_specification={"num_nodes": 2,
"ranks_per_node": 2})
task_package = {"task_id": 1, "buffer": mock_task_buffer}
scheduler.put_task(task_package)
assert scheduler._free_node_counter.value == 6
@pytest.mark.local
def test_MPISched_get_result():
task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
scheduler = MPITaskScheduler(task_q, result_q)
assert scheduler.available_nodes
assert len(scheduler.available_nodes) == 8
assert scheduler._free_node_counter.value == 8
nodes = [scheduler.nodes_q.get() for _ in range(4)]
scheduler._free_node_counter.value = 4
scheduler._map_tasks_to_nodes[1] = nodes
result_package = pickle.dumps({"task_id": 1, "type": "result", "buffer": "Foo"})
result_q.put(result_package)
result_received = scheduler.get_result(block=True, timeout=1)
assert result_received == result_package
assert scheduler._free_node_counter.value == 8
@pytest.mark.local
def test_MPISched_roundtrip():
task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
scheduler = MPITaskScheduler(task_q, result_q)
assert scheduler.available_nodes
assert len(scheduler.available_nodes) == 8
for round in range(1, 9):
assert scheduler._free_node_counter.value == 8
mock_task_buffer = pack_res_spec_apply_message("func",
"args",
"kwargs",
resource_specification={"num_nodes": round,
"ranks_per_node": 2})
task_package = {"task_id": round, "buffer": mock_task_buffer}
scheduler.put_task(task_package)
assert scheduler._free_node_counter.value == 8 - round
# Pop in a mock result
result_pkl = pickle.dumps({"task_id": round, "type": "result", "buffer": "RESULT BUF"})
result_q.put(result_pkl)
got_result = scheduler.get_result(True, 1)
assert got_result == result_pkl
@pytest.mark.local
def test_MPISched_contention():
"""Second task has to wait for the first task due to insufficient resources"""
task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue()
scheduler = MPITaskScheduler(task_q, result_q)
assert scheduler.available_nodes
assert len(scheduler.available_nodes) == 8
assert scheduler._free_node_counter.value == 8
mock_task_buffer = pack_res_spec_apply_message("func",
"args",
"kwargs",
resource_specification={
"num_nodes": 8,
"ranks_per_node": 2
})
task_package = {"task_id": 1, "buffer": mock_task_buffer}
scheduler.put_task(task_package)
assert scheduler._free_node_counter.value == 0
assert scheduler._backlog_queue.empty()
mock_task_buffer = pack_res_spec_apply_message("func",
"args",
"kwargs",
resource_specification={
"num_nodes": 8,
"ranks_per_node": 2
})
task_package = {"task_id": 2, "buffer": mock_task_buffer}
scheduler.put_task(task_package)
# Second task should now be in the backlog_queue
assert not scheduler._backlog_queue.empty()
# Confirm that the first task is available and has all 8 nodes provisioned
task_on_worker_side = task_q.get()
assert task_on_worker_side['task_id'] == 1
_, _, _, resource_spec = unpack_res_spec_apply_message(task_on_worker_side['buffer'])
assert len(resource_spec['MPI_NODELIST'].split(',')) == 8
assert task_q.empty() # Confirm that task 2 is not yet scheduled
# Simulate worker returning result and the scheduler picking up result
result_pkl = pickle.dumps({"task_id": 1, "type": "result", "buffer": "RESULT BUF"})
result_q.put(result_pkl)
got_result = scheduler.get_result(True, 1)
assert got_result == result_pkl
# Now task2 must be scheduled
assert scheduler._backlog_queue.empty()
# Pop in a mock result
task_on_worker_side = task_q.get()
assert task_on_worker_side['task_id'] == 2
_, _, _, resource_spec = unpack_res_spec_apply_message(task_on_worker_side['buffer'])
assert len(resource_spec['MPI_NODELIST'].split(',')) == 8
|