1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
|
import os
import signal
import time
import pytest
import zmq
import parsl
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.launchers import SimpleLauncher
from parsl.providers import LocalProvider
T_s = 1
def fresh_config():
htex = HighThroughputExecutor(
heartbeat_period=1 * T_s,
heartbeat_threshold=3 * T_s,
label="htex_local",
worker_debug=True,
cores_per_worker=1,
encrypted=False,
provider=LocalProvider(
init_blocks=0,
min_blocks=0,
max_blocks=0,
launcher=SimpleLauncher(),
),
)
c = Config(
executors=[htex],
strategy='none',
strategy_period=0.5,
)
return c, htex
@parsl.python_app
def app():
return 7
@pytest.mark.local
@pytest.mark.parametrize("msg",
(b'FuzzyByte\rSTREAM', # not JSON
b'{}', # missing fields
b'{"type":"heartbeat"}', # regression test #3262
)
)
def test_bad_messages(try_assert, msg):
"""This tests that the interchange is resilient to a few different bad
messages: malformed messages caused by implementation errors, and
heartbeat messages from managers that are not registered.
The heartbeat test is a regression test for issues #3262, #3632
"""
c, htex = fresh_config()
with parsl.load(c):
# send a bad message into the interchange on the task_outgoing worker
# channel, and then check that the interchange is still alive enough
# that we can scale out a block and run a task.
(task_port, result_port) = htex.command_client.run("WORKER_PORTS")
context = zmq.Context()
channel_timeout = 10000 # in milliseconds
task_channel = context.socket(zmq.DEALER)
task_channel.setsockopt(zmq.LINGER, 0)
task_channel.setsockopt(zmq.IDENTITY, b'testid')
task_channel.set_hwm(0)
task_channel.setsockopt(zmq.SNDTIMEO, channel_timeout)
task_channel.connect(f"tcp://localhost:{task_port}")
task_channel.send(msg)
# If the interchange exits, it's likely that this test will hang rather
# than raise an error, because the interchange interaction code
# assumes the interchange is always there.
# In the case of issue #3262, an exception message goes to stderr, and
# no error goes to the interchange log file.
htex.scale_out_facade(1)
try_assert(lambda: len(htex.connected_managers()) == 1, timeout_ms=10000)
assert app().result() == 7
|