1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416
|
# -*- coding: utf-8 -*-
"""Parser function
:copyright: 2014-2022 by PyVISA-sim Authors, see AUTHORS for more details.
:license: MIT, see LICENSE for more details.
"""
import importlib.resources
import os
import pathlib
from contextlib import closing
from io import StringIO, open
from traceback import format_exc
from typing import (
Any,
BinaryIO,
Dict,
Generic,
Literal,
Mapping,
TextIO,
Tuple,
TypeVar,
Union,
)
import yaml
from .channels import Channels
from .component import Component, NoResponse, Responses
from .devices import Device, Devices
def _ver_to_tuple(ver: str) -> Tuple[int, ...]:
return tuple(map(int, (ver.split("."))))
#: Version of the specification
SPEC_VERSION = "1.1"
SPEC_VERSION_TUPLE = _ver_to_tuple(SPEC_VERSION)
# FIXME does not allow to alter an inherited dialogue, property, etc
K = TypeVar("K")
V = TypeVar("V")
class SimpleChainmap(Generic[K, V]):
"""Combine multiple mappings for sequential lookup."""
def __init__(self, *maps: Mapping[K, V]) -> None:
self._maps = maps
def __getitem__(self, key: K) -> V:
for mapping in self._maps:
try:
return mapping[key]
except KeyError:
pass
raise KeyError(key)
def _get_pair(dd: Dict[str, str]) -> Tuple[str, str]:
"""Return a pair from a dialogue dictionary."""
return dd["q"].strip(" "), dd["r"].strip(" ") if "r" in dd else NoResponse # type: ignore[return-value]
def _get_triplet(
dd: Dict[str, str]
) -> Tuple[str, Union[str, Literal[Responses.NO]], Union[str, Literal[Responses.NO]]]:
"""Return a triplet from a dialogue dictionary."""
return (
dd["q"].strip(" "),
dd["r"].strip(" ") if "r" in dd else NoResponse,
dd["e"].strip(" ") if "e" in dd else NoResponse,
)
def _load(content_or_fp: Union[str, bytes, TextIO, BinaryIO]) -> Dict[str, Any]:
"""YAML Parse a file or str and check version."""
try:
data = yaml.load(content_or_fp, Loader=yaml.loader.BaseLoader)
except Exception as e:
raise type(e)("Malformed yaml file:\n%r" % format_exc())
try:
ver = data["spec"]
except Exception as e:
raise ValueError("The file does not specify a spec version") from e
try:
ver = tuple(map(int, (ver.split("."))))
except Exception as e:
raise ValueError(
"Invalid spec version format. Expect 'X.Y'"
" (X and Y integers), found %s" % ver
) from e
if ver > SPEC_VERSION_TUPLE:
raise ValueError(
"The spec version of the file is "
"%s but the parser is %s. "
"Please update pyvisa-sim." % (ver, SPEC_VERSION)
)
return data
def parse_resource(name: str) -> Dict[str, Any]:
"""Parse a resource file."""
with closing(importlib.resources.open_binary("pyvisa_sim", name)) as fp:
rbytes = fp.read()
return _load(StringIO(rbytes.decode("utf-8")))
def parse_file(fullpath: Union[str, pathlib.Path]) -> Dict[str, Any]:
"""Parse a file."""
with open(fullpath, encoding="utf-8") as fp:
return _load(fp)
def update_component(
name: str, comp: Component, component_dict: Dict[str, Any]
) -> None:
"""Get a component from a component dict."""
for dia in component_dict.get("dialogues", ()):
try:
comp.add_dialogue(*_get_pair(dia))
except Exception as e:
msg = "In device %s, malformed dialogue %s\n%r"
raise Exception(msg % (name, dia, e))
for prop_name, prop_dict in component_dict.get("properties", {}).items():
try:
getter = _get_pair(prop_dict["getter"]) if "getter" in prop_dict else None
setter = (
_get_triplet(prop_dict["setter"]) if "setter" in prop_dict else None
)
comp.add_property(
prop_name,
prop_dict.get("default", ""),
getter,
setter,
prop_dict.get("specs", {}),
)
except Exception as e:
msg = "In device %s, malformed property %s\n%r"
raise type(e)(msg % (name, prop_name, format_exc()))
def get_bases(definition_dict: Dict[str, Any], loader: "Loader") -> Dict[str, Any]:
"""Collect inherited behaviors."""
bases = definition_dict.get("bases", ())
if bases:
# FIXME this currently does not work
raise NotImplementedError
bases = (
loader.get_comp_dict(required_version=SPEC_VERSION_TUPLE[0], **b) # type: ignore
for b in bases
)
return SimpleChainmap(definition_dict, *bases)
else:
return definition_dict
def get_channel(
device: Device,
ch_name: str,
channel_dict: Dict[str, Any],
loader: "Loader",
resource_dict: Dict[str, Any],
) -> Channels:
"""Get a channels from a channels dictionary.
Parameters
----------
device : Device
Device from which to retrieve a channel
ch_name : str
Name of the channel to access
channel_dict : Dict[str, Any]
Definition of the channel.
loader : Loader
Loader containing all the loaded information.
resource_dict : Dict[str, Any]
Dictionary describing the resource to which the device is attached.
Returns
-------
Channels:
Channels for the device.
"""
cd = get_bases(channel_dict, loader)
r_ids = resource_dict.get("channel_ids", {}).get(ch_name, [])
ids = r_ids if r_ids else channel_dict.get("ids", {})
can_select = False if channel_dict.get("can_select") == "False" else True
channels = Channels(device, ids, can_select)
update_component(ch_name, channels, cd)
return channels
def get_device(
name: str,
device_dict: Dict[str, Any],
loader: "Loader",
resource_dict: Dict[str, str],
) -> Device:
"""Get a device from a device dictionary.
Parameters
----------
name : str
Name identifying the device.
device_dict : Dict[str, Any]
Dictionary describing the device.
loader : Loader
Global loader centralizing all devices information.
resource_dict : Dict[str, str]
Resource information to which the device is attached.
Returns
-------
Device
Accessed device
"""
device = Device(name, device_dict.get("delimiter", ";").encode("utf-8"))
device_dict = get_bases(device_dict, loader)
err = device_dict.get("error", {})
device.add_error_handler(err)
for itype, eom_dict in device_dict.get("eom", {}).items():
device.add_eom(itype, *_get_pair(eom_dict))
update_component(name, device, device_dict)
for ch_name, ch_dict in device_dict.get("channels", {}).items():
device.add_channels(
ch_name, get_channel(device, ch_name, ch_dict, loader, resource_dict)
)
return device
class Loader:
"""Loader handling accessing the definitions in YAML files.
Parameters
----------
filename : Union[str, pathlib.Path]
Path to the file to be loaded on creation.
bundled : bool
Is the file bundled with pyvisa-sim itself.
"""
#: Definitions loaded from a YAML file.
data: Dict[str, Any]
def __init__(self, filename: Union[str, pathlib.Path], bundled: bool):
self._cache = {}
self._filename = filename
self._bundled = bundled
self.data = self._load(filename, bundled, SPEC_VERSION_TUPLE[0])
def load(
self,
filename: Union[str, pathlib.Path],
bundled: bool,
parent: Union[str, pathlib.Path, None],
required_version: int,
):
"""Load a new file into the loader.
Parameters
----------
filename : Union[str, pathlib.Path]
Filename of the file to parse or name of the resource.
bundled : bool
Is the definition file bundled in pyvisa-sim.
parent : Union[str, pathlib.Path, None]
Path to directory in which the file can be found. If none the directory
in which the initial file was located.
required_version : int
Major required version.
"""
if self._bundled and not bundled:
msg = "Only other bundled files can be loaded from bundled files."
raise ValueError(msg)
if parent is None:
parent = self._filename
base = os.path.dirname(parent)
filename = os.path.join(base, filename)
return self._load(filename, bundled, required_version)
def get_device_dict(
self,
device: str,
filename: Union[str, pathlib.Path, None],
bundled: bool,
required_version: int,
):
"""Access a device definition.
Parameters
----------
device : str
Name of the device information to access.
filename : Union[str, pathlib.Path]
Filename of the file to parse or name of the resource.
The file must be located in the same directory as the original file.
bundled : bool
Is the definition file bundled in pyvisa-sim.
required_version : int
Major required version.
"""
if filename is None:
data = self.data
else:
data = self.load(filename, bundled, None, required_version)
return data["devices"][device]
# --- Private API
#: (absolute path / resource name / None, bundled) -> dict
_cache: Dict[Tuple[Union[str, pathlib.Path, None], bool], Dict[str, str]]
#: Path the first loaded file.
_filename: Union[str, pathlib.Path]
#: Is the loader working with bundled resources.
_bundled: bool
def _load(
self, filename: Union[str, pathlib.Path], bundled: bool, required_version: int
) -> Dict[str, Any]:
"""Load a YAML definition file.
The major version of the definition must match.
"""
if (filename, bundled) in self._cache:
return self._cache[(filename, bundled)]
if bundled:
assert isinstance(filename, str)
data = parse_resource(filename)
else:
data = parse_file(filename)
ver = _ver_to_tuple(data["spec"])[0]
if ver != required_version:
raise ValueError(
"Invalid version in %s (bundled = %s). "
"Expected %s, found %s," % (filename, bundled, required_version, ver)
)
self._cache[(filename, bundled)] = data
return data
def get_devices(filename: Union[str, pathlib.Path], bundled: bool) -> Devices:
"""Get a Devices object from a file.
Parameters
----------
filename : Union[str, pathlib.Path]
Full path of the file to parse or name of the resource.
bundled : bool
Is the definition file bundled in pyvisa-sim.
Returns
-------
Devices
Devices found in the definition file.
"""
loader = Loader(filename, bundled)
devices = Devices()
# Iterate through the resources and generate each individual device
# on demand.
for resource_name, resource_dict in loader.data.get("resources", {}).items():
device_name = resource_dict["device"]
dd = loader.get_device_dict(
device_name,
resource_dict.get("filename", None),
resource_dict.get("bundled", False),
SPEC_VERSION_TUPLE[0],
)
devices.add_device(
resource_name, get_device(device_name, dd, loader, resource_dict)
)
return devices
|