1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
|
#! /usr/bin/env python
#
# This file is part of pySerial - Cross platform serial port support for Python
# (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
#
# SPDX-License-Identifier: BSD-3-Clause
"""\
Some tests for the serial module.
Part of pyserial (http://pyserial.sf.net) (C)2001-2015 cliechti@gmx.net
Intended to be run on different platforms, to ensure portability of
the code.
For all these tests a simple hardware is required.
Loopback HW adapter:
Shortcut these pin pairs:
TX <-> RX
RTS <-> CTS
DTR <-> DSR
On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8)
"""
import unittest
import threading
import time
import sys
import serial
# on which port should the tests be performed:
PORT = 'loop://'
# indirection via bytearray b/c bytes(range(256)) does something else in Pyhton 2.7
bytes_0to255 = bytes(bytearray(range(256)))
def segments(data, size=16):
for a in range(0, len(data), size):
yield data[a:a + size]
class Test4_Nonblocking(unittest.TestCase):
"""Test with timeouts"""
timeout = 0
def setUp(self):
self.s = serial.serial_for_url(PORT, timeout=self.timeout)
def tearDown(self):
self.s.close()
def test0_Messy(self):
"""NonBlocking (timeout=0)"""
# this is only here to write out the message in verbose mode
# because Test3 and Test4 print the same messages
def test1_ReadEmpty(self):
"""timeout: After port open, the input buffer must be empty"""
self.assertEqual(self.s.read(1), b'', "expected empty buffer")
def test2_Loopback(self):
"""timeout: each sent character should return (binary test).
this is also a test for the binary capability of a port."""
for block in segments(bytes_0to255):
length = len(block)
self.s.write(block)
# there might be a small delay until the character is ready (especially on win32)
time.sleep(0.05)
self.assertEqual(self.s.in_waiting, length, "expected exactly {} character for inWainting()".format(length))
self.assertEqual(self.s.read(length), block) #, "expected a %r which was written before" % block)
self.assertEqual(self.s.read(1), b'', "expected empty buffer after all sent chars are read")
def test2_LoopbackTimeout(self):
"""timeout: test the timeout/immediate return.
partial results should be returned."""
self.s.write(b"HELLO")
time.sleep(0.1) # there might be a small delay until the character is ready (especially on win32 and rfc2217)
# read more characters as are available to run in the timeout
self.assertEqual(self.s.read(10), b'HELLO', "expected the 'HELLO' which was written before")
self.assertEqual(self.s.read(1), b'', "expected empty buffer after all sent chars are read")
class Test3_Timeout(Test4_Nonblocking):
"""Same tests as the NonBlocking ones but this time with timeout"""
timeout = 1
def test0_Messy(self):
"""Blocking (timeout=1)"""
# this is only here to write out the message in verbose mode
# because Test3 and Test4 print the same messages
class SendEvent(threading.Thread):
def __init__(self, serial, delay=3):
threading.Thread.__init__(self)
self.serial = serial
self.delay = delay
self.x = threading.Event()
self.stopped = 0
self.start()
def run(self):
time.sleep(self.delay)
self.x.set()
if not self.stopped:
self.serial.write(b"E")
self.serial.flush()
def isSet(self):
return self.x.isSet()
def stop(self):
self.stopped = 1
self.x.wait()
class Test1_Forever(unittest.TestCase):
"""Tests a port with no timeout. These tests require that a
character is sent after some time to stop the test, this is done
through the SendEvent class and the Loopback HW."""
def setUp(self):
self.s = serial.serial_for_url(PORT, timeout=None)
self.event = SendEvent(self.s)
def tearDown(self):
self.event.stop()
self.s.close()
def test2_ReadEmpty(self):
"""no timeout: after port open, the input buffer must be empty (read).
a character is sent after some time to terminate the test (SendEvent)."""
c = self.s.read(1)
if not (self.event.isSet() and c == b'E'):
self.fail("expected marker (evt={!r}, c={!r})".format(self.event.isSet(), c))
class Test2_Forever(unittest.TestCase):
"""Tests a port with no timeout"""
def setUp(self):
self.s = serial.serial_for_url(PORT, timeout=None)
def tearDown(self):
self.s.close()
def test1_inWaitingEmpty(self):
"""no timeout: after port open, the input buffer must be empty (in_waiting)"""
self.assertEqual(self.s.in_waiting, 0, "expected empty buffer")
def test2_Loopback(self):
"""no timeout: each sent character should return (binary test).
this is also a test for the binary capability of a port."""
for block in segments(bytes_0to255):
length = len(block)
self.s.write(block)
# there might be a small delay until the character is ready (especially on win32 and rfc2217)
time.sleep(0.05)
self.assertEqual(self.s.in_waiting, length) #, "expected exactly %d character for inWainting()" % length)
self.assertEqual(self.s.read(length), block) #, "expected %r which was written before" % block)
self.assertEqual(self.s.in_waiting, 0, "expected empty buffer after all sent chars are read")
class Test0_DataWires(unittest.TestCase):
"""Test modem control lines"""
def setUp(self):
self.s = serial.serial_for_url(PORT)
def tearDown(self):
self.s.close()
def test1_RTS(self):
"""Test RTS/CTS"""
self.s.rts = False
time.sleep(1.1)
self.assertTrue(not self.s.cts, "CTS -> 0")
self.s.rts = True
time.sleep(1.1)
self.assertTrue(self.s.cts, "CTS -> 1")
def test2_DTR(self):
"""Test DTR/DSR"""
self.s.dtr = False
time.sleep(1.1)
self.assertTrue(not self.s.dsr, "DSR -> 0")
self.s.dtr = True
time.sleep(1.1)
self.assertTrue(self.s.dsr, "DSR -> 1")
def test3_RI(self):
"""Test RI"""
self.assertTrue(not self.s.ri, "RI -> 0")
class Test_MoreTimeouts(unittest.TestCase):
"""Test with timeouts"""
def setUp(self):
# create an closed serial port
self.s = serial.serial_for_url(PORT, do_not_open=True)
def tearDown(self):
self.s.reset_output_buffer()
self.s.flush()
#~ self.s.write(serial.XON)
self.s.close()
# reopen... some faulty USB-serial adapter make next test fail otherwise...
self.s.timeout = 1
self.s.xonxoff = False
self.s.open()
self.s.read(3000)
self.s.close()
def test_WriteTimeout(self):
"""Test write() timeout."""
# use xonxoff setting and the loop-back adapter to switch traffic on hold
self.s.port = PORT
self.s.write_timeout = 1.0
self.s.xonxoff = True
self.s.open()
self.s.write(serial.XOFF)
time.sleep(0.5) # some systems need a little delay so that they can react on XOFF
t1 = time.time()
self.assertRaises(serial.SerialTimeoutException, self.s.write, b"timeout please" * 200)
t2 = time.time()
self.assertTrue(0.9 <= (t2 - t1) < 2.1, "Timeout not in the given interval ({})".format(t2 - t1))
if __name__ == '__main__':
sys.stdout.write(__doc__)
if len(sys.argv) > 1:
PORT = sys.argv[1]
sys.stdout.write("Testing port: {!r}\n".format(PORT))
sys.argv[1:] = ['-v']
# When this module is executed from the command-line, it runs all its tests
unittest.main()
|