File: monitoring.py

package info (click to toggle)
python-parsl 2026.02.23%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 12,144 kB
  • sloc: python: 24,449; makefile: 352; sh: 252; ansic: 45
file content (160 lines) | stat: -rw-r--r-- 6,602 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from __future__ import annotations

import logging
import multiprocessing.synchronize as ms
import os
import warnings
from multiprocessing.queues import Queue
from typing import Any, Optional, Union

import typeguard

from parsl.monitoring.types import TaggedMonitoringMessage
from parsl.multiprocessing import (
    SpawnEvent,
    SpawnProcess,
    SpawnQueue,
    join_terminate_close_proc,
)
from parsl.utils import RepresentationMixin

_db_manager_excepts: Optional[Exception]


try:
    from parsl.monitoring.db_manager import dbm_starter
except Exception as e:
    _db_manager_excepts = e
else:
    _db_manager_excepts = None

logger = logging.getLogger(__name__)


@typeguard.typechecked
class MonitoringHub(RepresentationMixin):
    def __init__(self,
                 hub_address: Any = None,  # unused, so no type enforcement
                 hub_port_range: Any = None,  # unused, so no type enforcement
                 hub_port: Any = None,  # unused, so no type enforcement

                 workflow_name: Optional[str] = None,
                 workflow_version: Optional[str] = None,
                 logging_endpoint: Optional[str] = None,
                 monitoring_debug: bool = False,
                 resource_monitoring_enabled: bool = True,
                 resource_monitoring_interval: float = 30):  # in seconds
        """
        Parameters
        ----------
        hub_address : unused
        hub_port : unused
             Unused, but probably retained until 2026-06-01 to give deprecation warning.
             These two values previously configured UDP parameters when UDP was used
             for monitoring messages from workers. These are now configured on the
             relevant UDPRadio.
        hub_port_range : unused
             Unused, but probably retained until 2026-06-01 to give deprecation warning.
             This value previously configured one ZMQ channel inside the
             HighThroughputExecutor. That ZMQ channel is now configured by the
             interchange_port_range parameter of HighThroughputExecutor.
        workflow_name : str
             The name for the workflow. Default to the name of the parsl script
        workflow_version : str
             The version of the workflow. Default to the beginning datetime of the parsl script
        logging_endpoint : str
             The database connection url for monitoring to log the information.
             These URLs follow RFC-1738, and can include username, password, hostname, database name.
             Default: sqlite, in the configured run_dir.
        monitoring_debug : Bool
             Enable monitoring debug logging. Default: False
        resource_monitoring_enabled : boolean
             Set this field to True to enable logging of information from the worker side.
             This will include environment information such as start time, hostname and block id,
             along with periodic resource usage of each task. Default: True
        resource_monitoring_interval : float
             The time interval, in seconds, at which the monitoring records the resource usage of each task.
             If set to 0, only start and end information will be logged, and no periodic monitoring will
             be made.
             Default: 30 seconds
        """

        if _db_manager_excepts:
            raise _db_manager_excepts

        # The following three parameters need to exist as attributes to be
        # output by RepresentationMixin.
        if hub_address is not None:
            message = "Instead of MonitoringHub.hub_address, specify UDPRadio(address=...)"
            warnings.warn(message, DeprecationWarning)
            logger.warning(message)

        self.hub_address = hub_address

        if hub_port is not None:
            message = "Instead of MonitoringHub.hub_port, specify UDPRadio(port=...)"
            warnings.warn(message, DeprecationWarning)
            logger.warning(message)

        self.hub_port = hub_port

        if hub_port_range is not None:
            message = "Instead of MonitoringHub.hub_port_range, Use HighThroughputExecutor.interchange_port_range"
            warnings.warn(message, DeprecationWarning)
            logger.warning(message)

        self.hub_port_range = hub_port_range

        self.logging_endpoint = logging_endpoint
        self.monitoring_debug = monitoring_debug

        self.workflow_name = workflow_name
        self.workflow_version = workflow_version

        self.resource_monitoring_enabled = resource_monitoring_enabled
        self.resource_monitoring_interval = resource_monitoring_interval

    def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None:

        logger.debug("Starting MonitoringHub")

        if self.logging_endpoint is None:
            self.logging_endpoint = f"sqlite:///{os.fspath(config_run_dir)}/monitoring.db"

        os.makedirs(dfk_run_dir, exist_ok=True)

        self.monitoring_hub_active = True

        self.resource_msgs: Queue[TaggedMonitoringMessage]
        self.resource_msgs = SpawnQueue()

        self.dbm_exit_event: ms.Event
        self.dbm_exit_event = SpawnEvent()

        self.dbm_proc = SpawnProcess(target=dbm_starter,
                                     args=(self.resource_msgs,),
                                     kwargs={"run_dir": dfk_run_dir,
                                             "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
                                             "db_url": self.logging_endpoint,
                                             "exit_event": self.dbm_exit_event,
                                             },
                                     name="Monitoring-DBM-Process",
                                     daemon=True,
                                     )
        self.dbm_proc.start()
        logger.info("Started DBM process %s", self.dbm_proc.pid)
        logger.info("Monitoring Hub initialized")

    def close(self) -> None:
        logger.info("Terminating Monitoring Hub")
        if self.monitoring_hub_active:
            self.monitoring_hub_active = False
            logger.debug("Waiting for DB termination")
            self.dbm_exit_event.set()
            join_terminate_close_proc(self.dbm_proc)
            logger.debug("Finished waiting for DBM termination")

            logger.info("Closing monitoring multiprocessing queues")
            self.resource_msgs.close()
            self.resource_msgs.join_thread()
            logger.info("Closed monitoring multiprocessing queues")