1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
|
#
# This file is part of the PyMeasure package.
#
# Copyright (c) 2013-2024 PyMeasure Developers
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
import logging
import time
import traceback
from queue import Queue
from .listeners import Recorder
from .procedure import Procedure
from .results import Results
from ..thread import StoppableThread
log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())
try:
import zmq
import cloudpickle
except ImportError:
zmq = None
cloudpickle = None
log.warning("ZMQ and cloudpickle are required for TCP communication")
class Worker(StoppableThread):
""" Worker runs the procedure and emits information about
the procedure and its status over a ZMQ TCP port. In a child
thread, a Recorder is run to write the results to
"""
def __init__(self, results, log_queue=None, log_level=logging.INFO, port=None):
""" Constructs a Worker to perform the Procedure
defined in the file at the filepath
"""
super().__init__()
self.port = port
if not isinstance(results, Results):
raise ValueError("Invalid Results object during Worker construction")
self.results = results
self.results.procedure.check_parameters()
self.results.procedure.status = Procedure.QUEUED
self.recorder = None
self.recorder_queue = Queue()
self.monitor_queue = Queue()
if log_queue is None:
log_queue = Queue()
self.log_queue = log_queue
self.log_level = log_level
global log
log = logging.getLogger()
log.setLevel(self.log_level)
# log.handlers = [] # Remove all other handlers
# log.addHandler(TopicQueueHandler(self.monitor_queue))
# log.addHandler(QueueHandler(self.log_queue))
self.context = None
self.publisher = None
if self.port is not None and zmq is not None:
try:
self.context = zmq.Context()
log.debug("Worker ZMQ Context: %r" % self.context)
self.publisher = self.context.socket(zmq.PUB)
self.publisher.bind('tcp://*:%d' % self.port)
log.info("Worker connected to tcp://*:%d" % self.port)
# wait so that the socket will be ready before starting to emit messages
time.sleep(0.3)
except Exception:
log.exception("Couldn't establish ZMQ publisher!")
self.context = None
self.publisher = None
def join(self, timeout=0):
try:
super().join(timeout)
except (KeyboardInterrupt, SystemExit):
log.warning("User stopped Worker join prematurely")
self.stop()
super().join(0)
def emit(self, topic, record):
""" Emits data of some topic over TCP """
log.debug("Emitting message: %s %s", topic, record)
try:
self.publisher.send_serialized(
record,
serialize=lambda rec: (topic.encode(), cloudpickle.dumps(rec)),
)
except (NameError, AttributeError):
pass # No dumps defined
if topic == 'results':
self.recorder.handle(record)
elif topic == 'status' or topic == 'progress':
self.monitor_queue.put((topic, record))
def handle_abort(self):
log.exception("User stopped Worker execution prematurely")
self.update_status(Procedure.ABORTED)
def handle_error(self):
log.exception("Worker caught an error on %r", self.procedure)
traceback_str = traceback.format_exc()
self.emit('error', traceback_str)
self.update_status(Procedure.FAILED)
def update_status(self, status):
self.procedure.status = status
self.emit('status', status)
def shutdown(self):
self.procedure.shutdown()
if self.should_stop() and self.procedure.status == Procedure.RUNNING:
self.update_status(Procedure.ABORTED)
elif self.procedure.status == Procedure.RUNNING:
self.update_status(Procedure.FINISHED)
self.emit('progress', 100.)
self.recorder.stop()
self.monitor_queue.put(None)
if self.context is not None:
# Cleanly close down ZMQ context and associated socket
# For some reason, we need to close the socket before the
# context, otherwise context termination hangs.
self.publisher.close()
self.context.term()
def run(self):
log.info("Worker thread started")
self.procedure = self.results.procedure
self.recorder = Recorder(self.results, self.recorder_queue)
self.recorder.start()
# locals()[self.procedures_file] = __import__(self.procedures_file)
# route Procedure methods & log
self.procedure.should_stop = self.should_stop
self.procedure.emit = self.emit
log.info("Worker started running an instance of %r", self.procedure.__class__.__name__)
self.update_status(Procedure.RUNNING)
self.emit('progress', 0.)
try:
self.procedure.startup()
self.procedure.evaluate_metadata()
self.results.store_metadata()
self.procedure.execute()
except (KeyboardInterrupt, SystemExit):
self.handle_abort()
except Exception:
self.handle_error()
finally:
self.shutdown()
self.stop()
def __repr__(self):
return "<{}(port={},procedure={},should_stop={})>".format(
self.__class__.__name__, self.port,
self.procedure.__class__.__name__,
self.should_stop()
)
|