1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
|
"""Core error utilities."""
from __future__ import annotations
import logging
from pathlib import Path
import re
import signal
import sys
import threading
import traceback
from typing import NamedTuple
import pyvista
from pyvista.core import _vtk_core as _vtk
def set_error_output_file(filename):
"""Set a file to write out the VTK errors.
Parameters
----------
filename : str, Path
Path to the file to write VTK errors to.
Returns
-------
vtkFileOutputWindow
VTK file output window.
vtkOutputWindow
VTK output window.
"""
filename = Path(filename).expanduser().resolve()
fileOutputWindow = _vtk.vtkFileOutputWindow()
if pyvista.vtk_version_info < (9, 2, 2): # pragma no cover
fileOutputWindow.SetFileName(str(filename))
else:
fileOutputWindow.SetFileName(filename)
outputWindow = _vtk.vtkOutputWindow()
outputWindow.SetInstance(fileOutputWindow)
return fileOutputWindow, outputWindow
class VtkErrorCatcher:
"""Context manager to temporarily catch VTK errors.
Parameters
----------
raise_errors : bool, default: False
Raise a ``RuntimeError`` when a VTK error is encountered.
send_to_logging : bool, default: True
Determine whether VTK errors raised within the context should
also be sent to logging.
Examples
--------
Catch VTK errors using the context manager.
>>> import pyvista as pv
>>> with pv.VtkErrorCatcher() as error_catcher:
... sphere = pv.Sphere()
...
"""
def __init__(self, raise_errors=False, send_to_logging=True):
"""Initialize context manager."""
self.raise_errors = raise_errors
self.send_to_logging = send_to_logging
def __enter__(self):
"""Observe VTK string output window for errors."""
error_output = _vtk.vtkStringOutputWindow()
error_win = _vtk.vtkOutputWindow()
self._error_output_orig = error_win.GetInstance()
error_win.SetInstance(error_output)
obs = Observer(log=self.send_to_logging, store_history=True)
obs.observe(error_output)
self._observer = obs
def __exit__(self, *args):
"""Stop observing VTK string output window."""
error_win = _vtk.vtkOutputWindow()
error_win.SetInstance(self._error_output_orig)
self.events = self._observer.event_history
if self.raise_errors and self.events:
errors = [RuntimeError(f'{e.kind}: {e.alert}', e.path, e.address) for e in self.events]
raise RuntimeError(errors)
class VtkEvent(NamedTuple):
"""Named tuple to store VTK event information."""
kind: str
path: str
address: str
alert: str
class Observer:
"""A standard class for observing VTK objects."""
def __init__(self, event_type='ErrorEvent', log=True, store_history=False):
"""Initialize observer."""
self.__event_occurred = False
self.__message = None
self.__message_etc = None
self.CallDataType = 'string0'
self.__observing = False
self.event_type = event_type
self.__log = log
self.store_history = store_history
self.event_history = []
@staticmethod
def parse_message(message):
"""Parse the given message."""
# Message format
regex = re.compile(r'([A-Z]+):\sIn\s(.+),\sline\s.+\n\w+\s\((.+)\):\s(.+)')
try:
kind, path, address, alert = regex.findall(message)[0]
except:
return '', '', '', message
else:
return kind, path, address, alert
def log_message(self, kind, alert):
"""Parse different event types and passes them to logging."""
if kind == 'ERROR':
logging.error(alert)
else:
logging.warning(alert)
def __call__(self, _obj, _event, message):
"""Declare standard call function for the observer.
On an event occurrence, this function executes.
"""
try:
self.__event_occurred = True
self.__message_etc = message
kind, path, address, alert = self.parse_message(message)
self.__message = alert
if self.store_history:
self.event_history.append(VtkEvent(kind, path, address, alert))
if self.__log:
self.log_message(kind, alert)
except Exception: # pragma: no cover
try:
if len(message) > 120:
message = f'{message[:100]!r} ... ({len(message)} characters)'
else:
message = repr(message)
print(
f'PyVista error in handling VTK error message:\n{message}',
file=sys.__stdout__,
)
traceback.print_tb(sys.last_traceback, file=sys.__stderr__)
except Exception:
pass
def has_event_occurred(self):
"""Ask self if an error has occurred since last queried.
This resets the observer's status.
"""
occ = self.__event_occurred
self.__event_occurred = False
return occ
def get_message(self, etc=False):
"""Get the last set error message.
Returns
-------
str: the last set error message
"""
if etc:
return self.__message_etc
return self.__message
def observe(self, algorithm):
"""Make this an observer of an algorithm."""
if self.__observing:
raise RuntimeError('This error observer is already observing an algorithm.')
if hasattr(algorithm, 'GetExecutive') and algorithm.GetExecutive() is not None:
algorithm.GetExecutive().AddObserver(self.event_type, self)
algorithm.AddObserver(self.event_type, self)
self.__observing = True
def send_errors_to_logging():
"""Send all VTK error/warning messages to Python's logging module."""
error_output = _vtk.vtkStringOutputWindow()
error_win = _vtk.vtkOutputWindow()
error_win.SetInstance(error_output)
obs = Observer()
return obs.observe(error_output)
class ProgressMonitor:
"""A standard class for monitoring the progress of a VTK algorithm.
This must be use in a ``with`` context and it will block keyboard
interrupts from happening until the exit event as interrupts will crash
the kernel if the VTK algorithm is still executing.
Parameters
----------
algorithm
VTK algorithm or filter.
message : str, default: ""
Message to display in the progress bar.
"""
def __init__(self, algorithm, message=""):
"""Initialize observer."""
try:
from tqdm import tqdm # noqa: F401
except ImportError:
raise ImportError("Please install `tqdm` to monitor algorithms.")
self.event_type = _vtk.vtkCommand.ProgressEvent
self.progress = 0.0
self._last_progress = self.progress
self.algorithm = algorithm
self.message = message
self._interrupt_signal_received = False
self._old_progress = 0
self._old_handler = None
self._progress_bar = None
def handler(self, sig, frame):
"""Pass signal to custom interrupt handler."""
self._interrupt_signal_received = (sig, frame)
logging.debug('SIGINT received. Delaying KeyboardInterrupt until VTK algorithm finishes.')
def __call__(self, obj, *args):
"""Call progress update callback.
On an event occurrence, this function executes.
"""
if self._interrupt_signal_received:
obj.AbortExecuteOn()
else:
progress = obj.GetProgress()
step = progress - self._old_progress
self._progress_bar.update(step)
self._old_progress = progress
def __enter__(self):
"""Enter event for ``with`` context."""
from tqdm import tqdm
# check if in main thread
if threading.current_thread().__class__.__name__ == '_MainThread':
self._old_handler = signal.signal(signal.SIGINT, self.handler)
self._progress_bar = tqdm(
total=1,
leave=True,
bar_format='{l_bar}{bar}[{elapsed}<{remaining}]',
)
self._progress_bar.set_description(self.message)
self.algorithm.AddObserver(self.event_type, self)
return self._progress_bar
def __exit__(self, *args):
"""Exit event for ``with`` context."""
self._progress_bar.total = 1
self._progress_bar.refresh()
self._progress_bar.close()
self.algorithm.RemoveObservers(self.event_type)
if threading.current_thread().__class__.__name__ == '_MainThread':
signal.signal(signal.SIGINT, self._old_handler)
|