File: multiprocessing.py

package info (click to toggle)
python-parsl 2025.11.10%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 12,124 kB
  • sloc: python: 24,375; makefile: 352; sh: 252; ansic: 45
file content (38 lines) | stat: -rw-r--r-- 1,373 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from multiprocessing import Queue

from parsl.monitoring.radios.base import (
    MonitoringRadioReceiver,
    MonitoringRadioSender,
    RadioConfig,
)
from parsl.utils import RepresentationMixin


class MultiprocessingQueueRadioSender(MonitoringRadioSender, RepresentationMixin):
    """A monitoring radio which connects over a multiprocessing Queue.
    This radio is intended to be used on the submit side, where components
    in the submit process, or processes launched by multiprocessing, will have
    access to a Queue shared with the monitoring database code (bypassing the
    monitoring router).
    """
    def __init__(self, queue: Queue) -> None:
        self.queue = queue

    def send(self, message: object) -> None:
        self.queue.put(message)


class MultiprocessingQueueRadio(RadioConfig):
    def create_sender(self) -> MonitoringRadioSender:
        return MultiprocessingQueueRadioSender(self._queue)

    def create_receiver(self, *, run_dir: str, resource_msgs: Queue) -> MonitoringRadioReceiver:
        # This object is only for use with an in-process thread-pool so it
        # is fine to store a reference to the message queue directly.
        self._queue = resource_msgs
        return MultiprocessingQueueRadioReceiver()


class MultiprocessingQueueRadioReceiver(MonitoringRadioReceiver):
    def shutdown(self) -> None:
        pass