1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
|
import logging
import os
import pickle
import uuid
from multiprocessing.queues import Queue
from parsl.monitoring.radios.base import (
MonitoringRadioReceiver,
MonitoringRadioSender,
RadioConfig,
)
from parsl.monitoring.radios.filesystem_router import FilesystemRadioReceiver
from parsl.utils import RepresentationMixin
logger = logging.getLogger(__name__)
class FilesystemRadio(RadioConfig, RepresentationMixin):
"""A MonitoringRadioSender that sends messages over a shared filesystem.
The messsage directory structure is based on maildir,
https://en.wikipedia.org/wiki/Maildir
The writer creates a message in tmp/ and then when it is fully
written, moves it atomically into new/
The reader ignores tmp/ and only reads and deletes messages from
new/
This avoids a race condition of reading partially written messages.
This radio is likely to give higher shared filesystem load compared to
the UDP radio, but should be much more reliable.
"""
def create_sender(self) -> MonitoringRadioSender:
return FilesystemRadioSender(run_dir=self.run_dir)
def create_receiver(self, *, run_dir: str, resource_msgs: Queue) -> MonitoringRadioReceiver:
self.run_dir = run_dir
return FilesystemRadioReceiver(resource_msgs, run_dir)
class FilesystemRadioSender(MonitoringRadioSender):
def __init__(self, *, run_dir: str):
logger.info("filesystem based monitoring radio initializing")
self.base_path = f"{run_dir}/monitor-fs-radio/"
self.tmp_path = f"{self.base_path}/tmp"
self.new_path = f"{self.base_path}/new"
os.makedirs(self.tmp_path, exist_ok=True)
os.makedirs(self.new_path, exist_ok=True)
def send(self, message: object) -> None:
logger.info("Sending a monitoring message via filesystem")
unique_id = str(uuid.uuid4())
tmp_filename = f"{self.tmp_path}/{unique_id}"
new_filename = f"{self.new_path}/{unique_id}"
buffer = message
# this will write the message out then atomically
# move it into new/, so that a partially written
# file will never be observed in new/
with open(tmp_filename, "wb") as f:
pickle.dump(buffer, f)
os.rename(tmp_filename, new_filename)
|