1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
|
import logging
import pickle
from parsl.monitoring.radios.base import MonitoringRadioSender
logger = logging.getLogger(__name__)
class HTEXRadioSender(MonitoringRadioSender):
def __init__(self, monitoring_url: str, timeout: int = 10):
"""
Parameters
----------
monitoring_url : str
URL of the form <scheme>://<IP>:<PORT>
timeout : int
timeout, default=10s
"""
logger.info("htex-based monitoring channel initialising")
def send(self, message: object) -> None:
""" Sends a message to the UDP receiver
Parameter
---------
message: object
Arbitrary pickle-able object that is to be sent
Returns:
None
"""
import parsl.executors.high_throughput.monitoring_info
result_queue = parsl.executors.high_throughput.monitoring_info.result_queue
# this message needs to go in the result queue tagged so that it is treated
# i) as a monitoring message by the interchange, and then further more treated
# as a RESOURCE_INFO message when received by monitoring (rather than a NODE_INFO
# which is the implicit default for messages from the interchange)
# for the interchange, the outer wrapper, this needs to be a dict:
interchange_msg = {
'type': 'monitoring',
'payload': message
}
if result_queue:
result_queue.put(pickle.dumps(interchange_msg))
else:
logger.error("result_queue is uninitialized - cannot put monitoring message")
return
|