1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
|
''' Test receiving, primarely RawConnection and LircdConnnection. '''
import asyncio
import os
import os.path
import subprocess
import sys
import time
import unittest
testdir = os.path.abspath(os.path.dirname(__file__))
os.chdir(testdir)
sys.path.insert(0, os.path.abspath(os.path.join(testdir, '..')))
from lirc import RawConnection, LircdConnection, CommandConnection
from lirc import AsyncConnection
import lirc
import signal
from contextlib import contextmanager, suppress
from concurrent.futures import TimeoutError
_PACKET_ONE = '0123456789abcdef 00 KEY_1 mceusb'
_LINE_0 = '0123456789abcdef 00 KEY_1 mceusb'
_SOCKET = 'lircd.socket'
_SOCAT = subprocess.check_output('which socat', shell=True) \
.decode('ascii').strip()
_EXPECT = subprocess.check_output('which expect', shell=True) \
.decode('ascii').strip()
class TimeoutException(Exception):
pass
@contextmanager
def event_loop(suppress=[]):
if isinstance(suppress, type) and issubclass(suppress, Exception):
suppress = [suppress]
elif isinstance(suppress, list):
for ex_type in suppress:
if not isinstance(ex_type, type) or not issubclass(ex_type, Exception):
raise ValueError('suppress is not an array of exception types')
else:
raise ValueError('suppress is not an exception type')
ex = []
def exception_handler(loop, context):
nonlocal ex
nonlocal suppress
for ex_type in suppress:
if isinstance(context['exception'], ex_type):
return
ex.append(context['exception'])
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_exception_handler(exception_handler)
try:
yield loop
finally:
loop.close()
if len(ex):
raise Exception('Unhandled exceptions in async code') from ex[0]
def _wait_for_socket():
''' Wait until the ncat process has setup the lircd.socket dummy. '''
i = 0
while not os.path.exists(_SOCKET):
time.sleep(0.01)
i += 1
if i > 100:
raise OSError('Cannot find socket file')
class ReceiveTests(unittest.TestCase):
''' Test various Connections. '''
@contextmanager
def assertCompletedBeforeTimeout(self, timeout):
triggered = False
def handle_timeout(signum, frame):
nonlocal triggered
triggered = True
for task in asyncio.Task.all_tasks():
task.cancel()
raise TimeoutException()
try:
signal.signal(signal.SIGALRM, handle_timeout)
signal.alarm(timeout)
with suppress(TimeoutException, asyncio.CancelledError):
yield
finally:
signal.signal(signal.SIGALRM, signal.SIG_DFL)
signal.alarm(0)
if triggered:
raise self.failureException('Code block did not complete before the {} seconds timeout'.format(timeout)) from None
def testReceiveOneRawLine(self):
''' Receive a single, raw line. '''
if os.path.exists(_SOCKET):
os.unlink(_SOCKET)
cmd = [_SOCAT, 'UNIX-LISTEN:' + _SOCKET,
'EXEC:"echo %s"' % _PACKET_ONE]
with subprocess.Popen(cmd) as child:
_wait_for_socket()
with RawConnection(socket_path=_SOCKET) as conn:
line = conn.readline()
self.assertEqual(line, _PACKET_ONE)
def testReceive10000RawLines(self):
''' Receive 10000 raw lines. '''
if os.path.exists(_SOCKET):
os.unlink(_SOCKET)
cmd = [_SOCAT, 'UNIX-LISTEN:' + _SOCKET,
'EXEC:"%s ./dummy-server 0"' % _EXPECT]
with subprocess.Popen(cmd,
stdout = subprocess.PIPE,
stderr = subprocess.STDOUT) as child:
_wait_for_socket()
lines = []
with RawConnection(socket_path=_SOCKET) as conn:
for i in range(0, 10000):
lines.append(conn.readline())
self.assertEqual(lines[0], _LINE_0)
self.assertEqual(lines[9999], _LINE_0.replace(" 00 ", " 09 "))
def testReceiveOneLine(self):
''' Receive a single, translated line OK. '''
if os.path.exists(_SOCKET):
os.unlink(_SOCKET)
cmd = [_SOCAT, 'UNIX-LISTEN:' + _SOCKET,
'EXEC:"echo %s"' % _PACKET_ONE]
with subprocess.Popen(cmd) as child:
_wait_for_socket()
with LircdConnection('foo',
socket_path=_SOCKET,
lircrc_path='lircrc.conf') as conn:
line = conn.readline()
self.assertEqual(line, 'foo-cmd')
def testReceive1AsyncLines(self):
''' Receive 1000 lines using the async interface. '''
async def get_lines(raw_conn, count, loop):
nonlocal lines
async with AsyncConnection(raw_conn, loop) as conn:
async for keypress in conn:
lines.append(keypress)
if len(lines) >= count:
return lines
if os.path.exists(_SOCKET):
os.unlink(_SOCKET)
cmd = [_SOCAT, 'UNIX-LISTEN:' + _SOCKET,
'EXEC:"%s ./dummy-server 0"' % _EXPECT]
lines = []
with subprocess.Popen(cmd,
stdout = subprocess.PIPE,
stderr = subprocess.STDOUT) as child:
_wait_for_socket()
with LircdConnection('foo',
socket_path=_SOCKET,
lircrc_path='lircrc.conf') as conn:
with event_loop() as loop:
loop.run_until_complete(get_lines(conn, 1000, loop))
self.assertEqual(len(lines), 1000)
self.assertEqual(lines[0], 'foo-cmd')
self.assertEqual(lines[999], 'foo-cmd')
def testReceiveTimeout(self):
''' Generate a TimeoutException if there is no data '''
if os.path.exists(_SOCKET):
os.unlink(_SOCKET)
cmd = [_SOCAT, 'UNIX-LISTEN:' + _SOCKET, 'EXEC:"sleep 1"']
with subprocess.Popen(cmd) as child:
_wait_for_socket()
with LircdConnection('foo',
socket_path=_SOCKET,
lircrc_path='lircrc.conf') as conn:
self.assertRaises(lirc.TimeoutException, conn.readline, 0.1)
def testReceiveDisconnect(self):
''' Generate a ConnectionResetError if connection is lost '''
if os.path.exists(_SOCKET):
os.unlink(_SOCKET)
cmd = [_SOCAT, 'UNIX-LISTEN:' + _SOCKET, 'EXEC:"sleep 1"']
with subprocess.Popen(cmd) as child:
_wait_for_socket()
with LircdConnection('foo',
socket_path=_SOCKET,
lircrc_path='lircrc.conf') as conn:
with self.assertRaises(ConnectionResetError):
with self.assertCompletedBeforeTimeout(3):
conn.readline(2)
def testReceiveAsyncDisconnectDontBlock(self):
''' Do not block the loop if connection is lost '''
async def readline(raw_conn):
async with AsyncConnection(raw_conn, loop) as conn:
return await conn.readline()
if os.path.exists(_SOCKET):
os.unlink(_SOCKET)
cmd = [_SOCAT, 'UNIX-LISTEN:' + _SOCKET, 'EXEC:"sleep 1"']
with subprocess.Popen(cmd) as child:
_wait_for_socket()
with LircdConnection('foo',
socket_path=_SOCKET,
lircrc_path='lircrc.conf') as conn:
with event_loop(suppress=[ConnectionResetError, TimeoutException]) as loop:
with self.assertCompletedBeforeTimeout(3):
with suppress(TimeoutError, ConnectionResetError):
loop.run_until_complete(asyncio.wait_for(readline(conn), 2))
def testReceiveAsyncExceptionReraises(self):
''' Async readline should reraise if an exception occurs during select loop '''
async def readline(raw_conn):
async with AsyncConnection(raw_conn, loop) as conn:
return await conn.readline()
if os.path.exists(_SOCKET):
os.unlink(_SOCKET)
cmd = [_SOCAT, 'UNIX-LISTEN:' + _SOCKET, 'EXEC:"sleep 1"']
with subprocess.Popen(cmd) as child:
_wait_for_socket()
with LircdConnection('foo',
socket_path=_SOCKET,
lircrc_path='lircrc.conf') as conn:
with event_loop(suppress=[ConnectionResetError, TimeoutException]) as loop:
with self.assertCompletedBeforeTimeout(2):
with self.assertRaises(ConnectionResetError):
loop.run_until_complete(readline(conn))
def testReceiveAsyncExceptionEndsIterator(self):
''' Async iterator should stop if an exception occurs in the select loop '''
async def get_lines(raw_conn):
async with AsyncConnection(raw_conn, loop) as conn:
async for keypress in conn:
pass
if os.path.exists(_SOCKET):
os.unlink(_SOCKET)
cmd = [_SOCAT, 'UNIX-LISTEN:' + _SOCKET, 'EXEC:"sleep 1"']
with subprocess.Popen(cmd) as child:
_wait_for_socket()
with LircdConnection('foo',
socket_path=_SOCKET,
lircrc_path='lircrc.conf') as conn:
with event_loop(suppress=[ConnectionResetError, TimeoutException]) as loop:
with self.assertCompletedBeforeTimeout(2):
self.assertIsNone(loop.run_until_complete(get_lines(conn)))
class CommandTests(unittest.TestCase):
''' Test Command, Reply, ReplyParser and some Commands samples. '''
def testRemotesCommmand(self):
''' Do LIST without arguments . '''
if os.path.exists(_SOCKET):
os.unlink(_SOCKET)
cmd = [_SOCAT, 'UNIX-LISTEN:' + _SOCKET,
'EXEC:"%s ./dummy-server 100"' % _EXPECT]
with subprocess.Popen(cmd,
stdout = subprocess.PIPE,
stderr = subprocess.STDOUT) as child:
_wait_for_socket()
with CommandConnection(socket_path=_SOCKET) as conn:
reply = lirc.ListRemotesCommand(conn).run()
self.assertEqual(len(reply.data), 2)
self.assertEqual(reply.success, True)
self.assertEqual(reply.data[0], 'mceusb1')
self.assertEqual(reply.data[1], 'mceusb2')
self.assertEqual(reply.sighup, False)
def testSighupReply(self):
''' Handle an unexpected SIGHUP in SEND_STOP reply. '''
if os.path.exists(_SOCKET):
os.unlink(_SOCKET)
cmd = [_SOCAT, 'UNIX-LISTEN:' + _SOCKET,
'EXEC:"%s ./dummy-server 100"' % _EXPECT]
with subprocess.Popen(cmd,
stdout = subprocess.PIPE,
stderr = subprocess.STDOUT) as child:
_wait_for_socket()
with CommandConnection(socket_path=_SOCKET) as conn:
reply = lirc.StopRepeatCommand(conn, 'mceusb', 'KEY_1').run()
self.assertEqual(len(reply.data), 0)
self.assertEqual(reply.success, True)
self.assertEqual(reply.sighup, True)
if __name__ == '__main__':
unittest.main()
|