1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
|
import logging
import pickle
from multiprocessing.queues import Queue
from parsl.monitoring.radios.base import (
MonitoringRadioReceiver,
MonitoringRadioSender,
RadioConfig,
)
from parsl.utils import RepresentationMixin
logger = logging.getLogger(__name__)
class HTEXRadio(RadioConfig, RepresentationMixin):
def create_sender(self) -> MonitoringRadioSender:
return HTEXRadioSender()
def create_receiver(self, *, run_dir: str, resource_msgs: Queue) -> MonitoringRadioReceiver:
return HTEXRadioReceiver()
class HTEXRadioSender(MonitoringRadioSender):
def __init__(self) -> None:
# there is nothing to initialize
pass
def send(self, message: object) -> None:
""" Sends a message to the UDP receiver
Parameter
---------
message: object
Arbitrary pickle-able object that is to be sent
Returns:
None
"""
import parsl.executors.high_throughput.monitoring_info
result_queue = parsl.executors.high_throughput.monitoring_info.result_queue
# this message needs to go in the result queue tagged so that it is treated
# i) as a monitoring message by the interchange, and then further more treated
# as a RESOURCE_INFO message when received by monitoring (rather than a NODE_INFO
# which is the implicit default for messages from the interchange)
# for the interchange, the outer wrapper, this needs to be a dict:
interchange_msg = {
'type': 'monitoring',
'payload': message
}
if result_queue:
result_queue.put(pickle.dumps(interchange_msg))
else:
logger.error("result_queue is uninitialized - cannot put monitoring message")
class HTEXRadioReceiver(MonitoringRadioReceiver):
def shutdown(self) -> None:
# there is nothing to shut down
pass
|