1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
|
import logging
import pickle
import socket
from parsl.monitoring.radios.base import MonitoringRadioSender
class UDPRadioSender(MonitoringRadioSender):
def __init__(self, monitoring_url: str, timeout: int = 10):
"""
Parameters
----------
monitoring_url : str
URL of the form <scheme>://<IP>:<PORT>
timeout : int
timeout, default=10s
"""
self.monitoring_url = monitoring_url
self.sock_timeout = timeout
try:
self.scheme, self.ip, port = (x.strip('/') for x in monitoring_url.split(':'))
self.port = int(port)
except Exception:
raise Exception("Failed to parse monitoring url: {}".format(monitoring_url))
self.sock = socket.socket(socket.AF_INET,
socket.SOCK_DGRAM,
socket.IPPROTO_UDP) # UDP
self.sock.settimeout(self.sock_timeout)
def send(self, message: object) -> None:
""" Sends a message to the UDP receiver
Parameter
---------
message: object
Arbitrary pickle-able object that is to be sent
Returns:
None
"""
try:
buffer = pickle.dumps(message)
except Exception:
logging.exception("Exception during pickling", exc_info=True)
return
try:
self.sock.sendto(buffer, (self.ip, self.port))
except socket.timeout:
logging.error("Could not send message within timeout limit")
return
return
|