1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
|
from __future__ import annotations
import logging
import multiprocessing.synchronize as ms
import os
import warnings
from multiprocessing.queues import Queue
from typing import Any, Optional, Union
import typeguard
from parsl.monitoring.types import TaggedMonitoringMessage
from parsl.multiprocessing import (
SpawnEvent,
SpawnProcess,
SpawnQueue,
join_terminate_close_proc,
)
from parsl.utils import RepresentationMixin
_db_manager_excepts: Optional[Exception]
try:
from parsl.monitoring.db_manager import dbm_starter
except Exception as e:
_db_manager_excepts = e
else:
_db_manager_excepts = None
logger = logging.getLogger(__name__)
@typeguard.typechecked
class MonitoringHub(RepresentationMixin):
def __init__(self,
hub_address: Any = None, # unused, so no type enforcement
hub_port_range: Any = None, # unused, so no type enforcement
hub_port: Any = None, # unused, so no type enforcement
workflow_name: Optional[str] = None,
workflow_version: Optional[str] = None,
logging_endpoint: Optional[str] = None,
monitoring_debug: bool = False,
resource_monitoring_enabled: bool = True,
resource_monitoring_interval: float = 30): # in seconds
"""
Parameters
----------
hub_address : unused
hub_port : unused
Unused, but probably retained until 2026-06-01 to give deprecation warning.
These two values previously configured UDP parameters when UDP was used
for monitoring messages from workers. These are now configured on the
relevant UDPRadio.
hub_port_range : unused
Unused, but probably retained until 2026-06-01 to give deprecation warning.
This value previously configured one ZMQ channel inside the
HighThroughputExecutor. That ZMQ channel is now configured by the
interchange_port_range parameter of HighThroughputExecutor.
workflow_name : str
The name for the workflow. Default to the name of the parsl script
workflow_version : str
The version of the workflow. Default to the beginning datetime of the parsl script
logging_endpoint : str
The database connection url for monitoring to log the information.
These URLs follow RFC-1738, and can include username, password, hostname, database name.
Default: sqlite, in the configured run_dir.
monitoring_debug : Bool
Enable monitoring debug logging. Default: False
resource_monitoring_enabled : boolean
Set this field to True to enable logging of information from the worker side.
This will include environment information such as start time, hostname and block id,
along with periodic resource usage of each task. Default: True
resource_monitoring_interval : float
The time interval, in seconds, at which the monitoring records the resource usage of each task.
If set to 0, only start and end information will be logged, and no periodic monitoring will
be made.
Default: 30 seconds
"""
if _db_manager_excepts:
raise _db_manager_excepts
# The following three parameters need to exist as attributes to be
# output by RepresentationMixin.
if hub_address is not None:
message = "Instead of MonitoringHub.hub_address, specify UDPRadio(address=...)"
warnings.warn(message, DeprecationWarning)
logger.warning(message)
self.hub_address = hub_address
if hub_port is not None:
message = "Instead of MonitoringHub.hub_port, specify UDPRadio(port=...)"
warnings.warn(message, DeprecationWarning)
logger.warning(message)
self.hub_port = hub_port
if hub_port_range is not None:
message = "Instead of MonitoringHub.hub_port_range, Use HighThroughputExecutor.interchange_port_range"
warnings.warn(message, DeprecationWarning)
logger.warning(message)
self.hub_port_range = hub_port_range
self.logging_endpoint = logging_endpoint
self.monitoring_debug = monitoring_debug
self.workflow_name = workflow_name
self.workflow_version = workflow_version
self.resource_monitoring_enabled = resource_monitoring_enabled
self.resource_monitoring_interval = resource_monitoring_interval
def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None:
logger.debug("Starting MonitoringHub")
if self.logging_endpoint is None:
self.logging_endpoint = f"sqlite:///{os.fspath(config_run_dir)}/monitoring.db"
os.makedirs(dfk_run_dir, exist_ok=True)
self.monitoring_hub_active = True
self.resource_msgs: Queue[TaggedMonitoringMessage]
self.resource_msgs = SpawnQueue()
self.dbm_exit_event: ms.Event
self.dbm_exit_event = SpawnEvent()
self.dbm_proc = SpawnProcess(target=dbm_starter,
args=(self.resource_msgs,),
kwargs={"run_dir": dfk_run_dir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
"db_url": self.logging_endpoint,
"exit_event": self.dbm_exit_event,
},
name="Monitoring-DBM-Process",
daemon=True,
)
self.dbm_proc.start()
logger.info("Started DBM process %s", self.dbm_proc.pid)
logger.info("Monitoring Hub initialized")
def close(self) -> None:
logger.info("Terminating Monitoring Hub")
if self.monitoring_hub_active:
self.monitoring_hub_active = False
logger.debug("Waiting for DB termination")
self.dbm_exit_event.set()
join_terminate_close_proc(self.dbm_proc)
logger.debug("Finished waiting for DBM termination")
logger.info("Closing monitoring multiprocessing queues")
self.resource_msgs.close()
self.resource_msgs.join_thread()
logger.info("Closed monitoring multiprocessing queues")
|