1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
|
from __future__ import annotations
import logging
import multiprocessing.queues as mpq
import os
import pickle
import socket
import threading
import time
from multiprocessing.synchronize import Event
from typing import Optional, Tuple
import typeguard
import zmq
from parsl.log_utils import set_file_logger
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
from parsl.monitoring.types import TaggedMonitoringMessage
from parsl.process_loggers import wrap_with_logs
from parsl.utils import setproctitle
logger = logging.getLogger(__name__)
class MonitoringRouter:
def __init__(self,
*,
hub_address: str,
udp_port: Optional[int] = None,
zmq_port_range: Tuple[int, int] = (55050, 56000),
monitoring_hub_address: str = "127.0.0.1",
run_dir: str = ".",
logging_level: int = logging.INFO,
atexit_timeout: int = 3, # in seconds
resource_msgs: mpq.Queue,
exit_event: Event,
):
""" Initializes a monitoring configuration class.
Parameters
----------
hub_address : str
The ip address at which the workers will be able to reach the Hub.
udp_port : int
The specific port at which workers will be able to reach the Hub via UDP. Default: None
zmq_port_range : tuple(int, int)
The MonitoringHub picks ports at random from the range which will be used by Hub.
Default: (55050, 56000)
run_dir : str
Parsl log directory paths. Logs and temp files go here. Default: '.'
logging_level : int
Logging level as defined in the logging module. Default: logging.INFO
atexit_timeout : float, optional
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
resource_msgs : multiprocessing.Queue
A multiprocessing queue to receive messages to be routed onwards to the database process
exit_event : Event
An event that the main Parsl process will set to signal that the monitoring router should shut down.
"""
os.makedirs(run_dir, exist_ok=True)
self.logger = set_file_logger(f"{run_dir}/monitoring_router.log",
name="monitoring_router",
level=logging_level)
self.logger.debug("Monitoring router starting")
self.hub_address = hub_address
self.atexit_timeout = atexit_timeout
self.loop_freq = 10.0 # milliseconds
# Initialize the UDP socket
self.udp_sock = socket.socket(socket.AF_INET,
socket.SOCK_DGRAM,
socket.IPPROTO_UDP)
# We are trying to bind to all interfaces with 0.0.0.0
if not udp_port:
self.udp_sock.bind(('0.0.0.0', 0))
self.udp_port = self.udp_sock.getsockname()[1]
else:
self.udp_port = udp_port
try:
self.udp_sock.bind(('0.0.0.0', self.udp_port))
except Exception as e:
raise RuntimeError(f"Could not bind to udp_port {udp_port} because: {e}")
self.udp_sock.settimeout(self.loop_freq / 1000)
self.logger.info("Initialized the UDP socket on 0.0.0.0:{}".format(self.udp_port))
self._context = zmq.Context()
self.zmq_receiver_channel = self._context.socket(zmq.DEALER)
self.zmq_receiver_channel.setsockopt(zmq.LINGER, 0)
self.zmq_receiver_channel.set_hwm(0)
self.zmq_receiver_channel.RCVTIMEO = int(self.loop_freq) # in milliseconds
self.logger.debug("hub_address: {}. zmq_port_range {}".format(hub_address, zmq_port_range))
self.zmq_receiver_port = self.zmq_receiver_channel.bind_to_random_port("tcp://*",
min_port=zmq_port_range[0],
max_port=zmq_port_range[1])
self.target_radio = MultiprocessingQueueRadioSender(resource_msgs)
self.exit_event = exit_event
@wrap_with_logs(target="monitoring_router")
def start(self) -> None:
self.logger.info("Starting UDP listener thread")
udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener, daemon=True)
udp_radio_receiver_thread.start()
self.logger.info("Starting ZMQ listener thread")
zmq_radio_receiver_thread = threading.Thread(target=self.start_zmq_listener, daemon=True)
zmq_radio_receiver_thread.start()
self.logger.info("Joining on ZMQ listener thread")
zmq_radio_receiver_thread.join()
self.logger.info("Joining on UDP listener thread")
udp_radio_receiver_thread.join()
self.logger.info("Joined on both ZMQ and UDP listener threads")
@wrap_with_logs(target="monitoring_router")
def start_udp_listener(self) -> None:
try:
while not self.exit_event.is_set():
try:
data, addr = self.udp_sock.recvfrom(2048)
resource_msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg))
self.target_radio.send(resource_msg)
except socket.timeout:
pass
self.logger.info("UDP listener draining")
last_msg_received_time = time.time()
while time.time() - last_msg_received_time < self.atexit_timeout:
try:
data, addr = self.udp_sock.recvfrom(2048)
msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
self.target_radio.send(msg)
last_msg_received_time = time.time()
except socket.timeout:
pass
self.logger.info("UDP listener finishing normally")
finally:
self.logger.info("UDP listener finished")
@wrap_with_logs(target="monitoring_router")
def start_zmq_listener(self) -> None:
try:
while not self.exit_event.is_set():
try:
dfk_loop_start = time.time()
while time.time() - dfk_loop_start < 1.0: # TODO make configurable
# note that nothing checks that msg really is of the annotated type
msg: TaggedMonitoringMessage
msg = self.zmq_receiver_channel.recv_pyobj()
assert isinstance(msg, tuple), "ZMQ Receiver expects only tuples, got {}".format(msg)
assert len(msg) >= 1, "ZMQ Receiver expects tuples of length at least 1, got {}".format(msg)
assert len(msg) == 2, "ZMQ Receiver expects message tuples of exactly length 2, got {}".format(msg)
self.target_radio.send(msg)
except zmq.Again:
pass
except Exception:
# This will catch malformed messages. What happens if the
# channel is broken in such a way that it always raises
# an exception? Looping on this would maybe be the wrong
# thing to do.
self.logger.warning("Failure processing a ZMQ message", exc_info=True)
self.logger.info("ZMQ listener finishing normally")
finally:
self.logger.info("ZMQ listener finished")
@wrap_with_logs
@typeguard.typechecked
def router_starter(*,
comm_q: mpq.Queue,
exception_q: mpq.Queue,
resource_msgs: mpq.Queue,
exit_event: Event,
hub_address: str,
udp_port: Optional[int],
zmq_port_range: Tuple[int, int],
run_dir: str,
logging_level: int) -> None:
setproctitle("parsl: monitoring router")
try:
router = MonitoringRouter(hub_address=hub_address,
udp_port=udp_port,
zmq_port_range=zmq_port_range,
run_dir=run_dir,
logging_level=logging_level,
resource_msgs=resource_msgs,
exit_event=exit_event)
except Exception as e:
logger.error("MonitoringRouter construction failed.", exc_info=True)
comm_q.put(f"Monitoring router construction failed: {e}")
else:
comm_q.put((router.udp_port, router.zmq_receiver_port))
router.logger.info("Starting MonitoringRouter in router_starter")
try:
router.start()
except Exception as e:
router.logger.exception("router.start exception")
exception_q.put(('Hub', str(e)))
|