File: usbraw.py

package info (click to toggle)
pyvisa-py 0.7.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 604 kB
  • sloc: python: 4,833; makefile: 140; sh: 4
file content (81 lines) | stat: -rw-r--r-- 2,325 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# -*- coding: utf-8 -*-
"""Implements Session to control USB Raw devices

Loosely based on PyUSBTMC:python module to handle
USB-TMC(Test and Measurement class) devices. by Noboru Yamamot, Accl. Lab, KEK, JAPAN

This file is an offspring of the Lantz Project.

:copyright: 2014-2020 by PyVISA-py Authors, see AUTHORS for more details.
:license: MIT, see LICENSE for more details.

"""

from .usbtmc import USBRaw as USBRaw
from .usbutil import find_devices, find_interfaces


def find_raw_devices(
    vendor=None, product=None, serial_number=None, custom_match=None, **kwargs
):
    """Find connected USB RAW devices. See usbutil.find_devices for more info."""

    def is_usbraw(dev):
        if custom_match and not custom_match(dev):
            return False
        return bool(find_interfaces(dev, bInterfaceClass=0xFF, bInterfaceSubClass=0xFF))

    return find_devices(vendor, product, serial_number, is_usbraw, **kwargs)


class USBRawDevice(USBRaw):
    RECV_CHUNK = 1024**2

    find_devices = staticmethod(find_raw_devices)

    def __init__(self, vendor=None, product=None, serial_number=None, **kwargs):
        super(USBRawDevice, self).__init__(vendor, product, serial_number, **kwargs)

        if not (self.usb_recv_ep and self.usb_send_ep):
            raise ValueError(
                "USBRAW device must have both Bulk-In and Bulk-out endpoints."
            )

    def write(self, data):
        """Send raw bytes to the instrument.

        :param data: bytes to be sent to the instrument
        :type data: bytes
        """

        begin, end, size = 0, 0, len(data)
        bytes_sent = 0

        raw_write = super(USBRawDevice, self).write

        while not end > size:
            begin = end
            end = begin + self.RECV_CHUNK
            bytes_sent += raw_write(data[begin:end])

        return bytes_sent

    def read(self, size):
        """Read raw bytes from the instrument.

        :param size: amount of bytes to be sent to the instrument
        :type size: integer
        :return: received bytes
        :return type: bytes
        """

        raw_read = super(USBRawDevice, self).read

        received = bytearray()

        while not len(received) >= size:
            resp = raw_read(self.RECV_CHUNK)

            received.extend(resp)

        return bytes(received)