File: highlevel.py

package info (click to toggle)
pyvisa-sim 0.6.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 428 kB
  • sloc: python: 1,749; makefile: 129
file content (368 lines) | stat: -rw-r--r-- 12,179 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# -*- coding: utf-8 -*-
"""Simulated VISA Library.

:copyright: 2014 by PyVISA-sim Authors, see AUTHORS for more details.
:license: MIT, see LICENSE for more details.

"""
import random
from collections import OrderedDict
from traceback import format_exc
from typing import Any, Dict, SupportsInt, Tuple, Union, overload

import pyvisa.errors as errors
from pyvisa import constants, highlevel, rname
from pyvisa.typing import VISAEventContext, VISARMSession, VISASession
from pyvisa.util import LibraryPath

# This import is required to register subclasses
from . import parser
from .sessions import gpib, serial, tcpip, usb  # noqa
from .sessions.session import Session


class SimVisaLibrary(highlevel.VisaLibraryBase):
    """A pure Python backend for PyVISA.

    The object is basically a dispatcher with some common functions implemented.

    When a new resource object is requested to pyvisa, the library creates a Session
    object (that knows how to perform low-level communication operations) associated
    with a session handle (a number, usually referred just as session).

    A call to a library function is handled by PyVisaLibrary if it involves a resource
    agnostic function or dispatched to the correct session object (obtained from the
    session id).

    Importantly, the user is unaware of this. PyVisaLibrary behaves for the user
    just as an IVIVisaLibrary.

    """

    #: Maps session handle to session objects.
    sessions: Dict[VISASession, Session]

    @staticmethod
    def get_library_paths() -> Tuple[LibraryPath]:
        """List a dummy library path to allow to create the library."""
        return (LibraryPath("unset"),)

    @staticmethod
    def get_debug_info() -> Dict[str, str]:
        """Return a list of lines with backend info."""
        from . import __version__
        from .parser import SPEC_VERSION

        d = OrderedDict()
        d["Version"] = "%s" % __version__
        d["Spec version"] = SPEC_VERSION

        return d

    def _init(self) -> None:
        self.sessions: Dict[int, Session] = {}
        try:
            if self.library_path == "unset":
                self.devices = parser.get_devices("default.yaml", True)
            else:
                self.devices = parser.get_devices(self.library_path, False)
        except Exception as e:
            msg = "Could not parse definitions file. %r"
            raise type(e)(msg % format_exc())

    @overload
    def _register(self, obj: "SimVisaLibrary") -> VISARMSession:
        ...

    @overload
    def _register(self, obj: Session) -> VISASession:
        ...

    def _register(self, obj):
        """Creates a random but unique session handle for a session object.

        The handle is registered it in the sessions dictionary and returned.

        """
        session = None

        while session is None or session in self.sessions:
            session = random.randint(1000000, 9999999)

        self.sessions[session] = obj
        return session

    def open(
        self,
        session: VISARMSession,
        resource_name: str,
        access_mode: constants.AccessModes = constants.AccessModes.no_lock,
        open_timeout: SupportsInt = constants.VI_TMO_IMMEDIATE,
    ) -> Tuple[VISASession, constants.StatusCode]:
        """Opens a session to the specified resource.

        Corresponds to viOpen function of the VISA library.

        Parameters
        ----------
        sessions : VISARMSession
            Resource Manager session (should always be a session returned
            from open_default_resource_manager()).
        resource_name : str
            Unique symbolic name of a resource.
        access_mode : constants.AccessModes
            Specifies the mode by which the resource is to be accessed.
        open_timeout : int
            Specifies the maximum time period (in milliseconds) that this operation
            waits before returning an error.

        Returns
        -------
        VISASession
            Unique logical identifier reference to a session, return value of the
            library call.
        constants.StatusCode
            Status code describing the operation execution.

        """

        try:
            open_timeout = int(open_timeout)
        except ValueError:
            raise ValueError(
                "open_timeout (%r) must be an integer (or compatible type)"
                % open_timeout
            )

        try:
            parsed = rname.parse_resource_name(resource_name)
        except rname.InvalidResourceName:
            return VISASession(0), constants.StatusCode.error_invalid_resource_name

        # Loops through all session types, tries to parse the resource name and if ok, open it.
        cls = Session.get_session_class(
            parsed.interface_type_const, parsed.resource_class
        )

        sess = cls(session, resource_name, parsed)

        try:
            r_name = sess.attrs[constants.ResourceAttribute.resource_name]
            assert isinstance(r_name, str)
            sess.device = self.devices[r_name]
        except KeyError:
            return VISASession(0), constants.StatusCode.error_resource_not_found

        return self._register(sess), constants.StatusCode.success

    def close(
        self, session: Union[VISASession, VISARMSession, VISAEventContext]
    ) -> constants.StatusCode:
        """Closes the specified session, event, or find list.

        Corresponds to viClose function of the VISA library.

        Parameters
        ----------
        session : Union[VISASession, VISARMSession, VISAEventContext]
            Unique logical identifier to a session, event, or find list.

        Returns
        -------
        constants.StatusCode
            Return value of the library call.

        """
        try:
            del self.sessions[session]  # type: ignore
            return constants.StatusCode.success
        except KeyError:
            return constants.StatusCode.error_invalid_object

    def open_default_resource_manager(
        self,
    ) -> Tuple[VISARMSession, constants.StatusCode]:
        """This function returns a session to the Default Resource Manager resource.

        Corresponds to viOpenDefaultRM function of the VISA library.

        Returns
        -------
        VISARMSession
            Unique logical identifier to a Default Resource Manager session, return
            value of the library call.
        constants.StatusCode
            Return value of the library call.

        """
        return self._register(self), constants.StatusCode.success

    def list_resources(
        self, session: VISARMSession, query: str = "?*::INSTR"
    ) -> Tuple[str, ...]:
        """Returns a tuple of all connected devices matching query.

        Parameters
        ----------
        session : VISARMSession
            Resource manager session
        query : str
            VISA regular expression used to match devices.

        """
        # For each session type, ask for the list of connected resources and merge
        # them into a single list.
        resources = self.devices.list_resources()

        resources = rname.filter(resources, query)

        if resources:
            return resources

        raise errors.VisaIOError(errors.StatusCode.error_resource_not_found.value)

    def read(
        self, session: VISASession, count: int
    ) -> Tuple[bytes, constants.StatusCode]:
        """Reads data from device or interface synchronously.

        Corresponds to viRead function of the VISA library.

        Parameters
        ----------
        session : VISASession
            Unique logical identifier to a session.
        count : int
            Number of bytes to be read.

        Returns
        -------
        bytes
            Date read
        constants.StatusCode
            Return value of the library call.

        """

        try:
            sess = self.sessions[session]
        except KeyError:
            return b"", constants.StatusCode.error_invalid_object

        try:
            # We have an explicit except AttributeError
            chunk, status = sess.read(count)  # type: ignore
            if status == constants.StatusCode.error_timeout:
                raise errors.VisaIOError(constants.VI_ERROR_TMO)
            return chunk, status
        except AttributeError:
            return b"", constants.StatusCode.error_nonsupported_operation

    def write(
        self, session: VISASession, data: bytes
    ) -> Tuple[int, constants.StatusCode]:
        """Writes data to device or interface synchronously.

        Corresponds to viWrite function of the VISA library.

        Parameters
        ----------
        session : VISASession
            Unique logical identifier to a session.
        data : bytes
            Data to be written.

        Returns
        -------
        int
            Number of bytes actually transferred
        constants.StatusCode
            Return value of the library call.

        """

        try:
            sess = self.sessions[session]
        except KeyError:
            return 0, constants.StatusCode.error_invalid_object

        try:
            # We have an explicit except AttributeError
            return sess.write(data)  # type: ignore
        except AttributeError:
            return 0, constants.StatusCode.error_nonsupported_operation

    def get_attribute(
        self,
        session: Union[VISASession, VISARMSession, VISAEventContext],
        attribute: Union[constants.ResourceAttribute, constants.EventAttribute],
    ) -> Tuple[Any, constants.StatusCode]:
        """Retrieves the state of an attribute.

        Corresponds to viGetAttribute function of the VISA library.

        Parameters
        ----------
        session : Union[VISASession, VISARMSession, VISAEventContext]
            Unique logical identifier to a session, event, or find list.
        attribute : Union[constants.ResourceAttribute, constants.EventAttribute]
            Resource attribute for which the state query is made (see Attributes.*)

        Returns
        -------
        Any
            State of the queried attribute for a specified resource
        constants.StatusCode
            Return value of the library call.

        """
        try:
            sess = self.sessions[session]  # type: ignore
        except KeyError:
            return 0, constants.StatusCode.error_invalid_object

        # Not sure how to handle events yet and I do not want to error if people keep
        # using the bare attribute values.
        return sess.get_attribute(attribute)  # type: ignore

    def set_attribute(
        self,
        session: Union[VISASession, VISARMSession, VISAEventContext],
        attribute: Union[constants.ResourceAttribute, constants.EventAttribute],
        attribute_state: Any,
    ) -> constants.StatusCode:
        """Sets the state of an attribute.

        Corresponds to viSetAttribute function of the VISA library.

        Parameters
        ----------
        session : Union[VISASession, VISARMSession, VISAEventContext]
            Unique logical identifier to a session.
        attribute : Union[constants.ResourceAttribute, constants.EventAttribute]
            Attribute for which the state is to be modified. (Attributes.*)
        attribute_state : Any
            The state of the attribute to be set for the specified object.

        Returns
        -------
        constants.StatusCode
            Return value of the library call.

        """
        try:
            sess = self.sessions[session]  # type: ignore
        except KeyError:
            return constants.StatusCode.error_invalid_object

        # Not sure how to handle events yet and I do not want to error if people keep
        # using the bare attribute values.
        return sess.set_attribute(attribute, attribute_state)  # type: ignore

    def disable_event(self, session, event_type, mechanism):
        # TODO: implement this for GPIB finalization
        pass

    def discard_events(self, session, event_type, mechanism):
        # TODO: implement this for GPIB finalization
        pass