1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
#
# This file is part of the PyMeasure package.
#
# Copyright (c) 2013-2024 PyMeasure Developers
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
import logging
import serial
from .adapter import Adapter
log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())
class SerialAdapter(Adapter):
""" Adapter class for using the Python Serial package to allow
serial communication to instrument
:param port: Serial port
:param preprocess_reply: An optional callable used to preprocess strings
received from the instrument. The callable returns the processed string.
.. deprecated:: 0.11
Implement it in the instrument's `read` method instead.
:param write_termination: String appended to messages before writing them.
:param read_termination: String expected at end of read message and removed.
:param \\**kwargs: Any valid key-word argument for serial.Serial
"""
def __init__(self, port, preprocess_reply=None,
write_termination="", read_termination="",
**kwargs):
super().__init__(preprocess_reply=preprocess_reply)
if isinstance(port, serial.SerialBase):
self.connection = port
else:
self.connection = serial.Serial(port, **kwargs)
self.write_termination = write_termination
self.read_termination = read_termination
def _write(self, command, **kwargs):
"""Write a string command to the instrument appending `write_termination`.
:param str command: Command string to be sent to the instrument
(without termination).
:param \\**kwargs: Keyword arguments for the connection itself.
"""
command += self.write_termination
self._write_bytes(command.encode(), **kwargs)
def _write_bytes(self, content, **kwargs):
"""Write the bytes `content` to the instrument.
:param bytes content: The bytes to write to the instrument.
:param \\**kwargs: Keyword arguments for the connection itself.
"""
self.connection.write(content, **kwargs)
def _read(self, **kwargs):
"""Read up to (excluding) `read_termination` or the whole read buffer.
:param \\**kwargs: Keyword arguments for the connection itself.
:returns str: ASCII response of the instrument (read_termination is removed first).
"""
read = self._read_bytes(-1, break_on_termchar=True, **kwargs).decode()
# Python>3.8 this shorter form is possible:
# self._read_bytes(-1).decode().removesuffix(self.read_termination)
if self.read_termination:
return read.split(self.read_termination)[0]
else:
return read
def _read_bytes(self, count, break_on_termchar, **kwargs):
"""Read a certain number of bytes from the instrument.
:param int count: Number of bytes to read. A value of -1 indicates to
read from the whole read buffer (waits for timeout).
:param bool break_on_termchar: Stop reading at a termination character.
:param \\**kwargs: Keyword arguments for the connection itself.
:returns bytes: Bytes response of the instrument (including termination).
"""
if break_on_termchar and self.read_termination:
return self.connection.read_until(self.read_termination.encode(),
count if count > 0 else None,
**kwargs)
elif count >= 0:
return self.connection.read(count, **kwargs)
else:
# For -1 we empty the buffer completely
return self._read_bytes_until_timeout()
def _read_bytes_until_timeout(self, chunk_size=256, **kwargs):
"""Read from the serial until a timeout occurs, regardless of the number of bytes.
:chunk_size: The number of bytes attempted to in a single transaction.
Multiple of these transactions will occur.
"""
# `Serial.readlines()` has an unpredictable timeout, see PR #866
data = bytes()
while True:
chunk = self.connection.read(chunk_size, **kwargs)
data += chunk
if len(chunk) < chunk_size: # If fewer bytes got returned, we had a timeout
return data
def flush_read_buffer(self):
"""Flush and discard the input buffer."""
self.connection.reset_input_buffer()
def __repr__(self):
return "<SerialAdapter(port='%s')>" % self.connection.port
|