1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
|
#!/usr/bin/python3
# -*- coding: utf-8 -*-
from subprocess import Popen, PIPE, call
import signal
class Alarm(Exception):
pass
class IOTest:
expectedRetCodeFail = False
def alarmHandler(self, signum, frame):
raise Alarm
def withTimeout(self, seconds, cmd, *args, **kwargs):
signal.signal(signal.SIGALRM, self.alarmHandler)
signal.alarm(seconds)
ret = cmd(*args, **kwargs)
signal.alarm(0) # reset
return ret
def communicateFlush(self, string):
self.proc.stdin.write(string.encode('utf-8') + b'\0')
self.proc.stdin.flush()
output = b''
c = None
while c != b'\0':
try:
c = self.withTimeout(2, self.proc.stdout.read, 1)
except Alarm:
break
if c != b'\0':
output += c
return output.decode('utf-8')
def checkCommand(self, cmd):
self.assertEqual(0, call(cmd))
def runCommand(self, cmd, pairs):
self.proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
blank = '[][\n]'
for inp, exp in pairs:
with self.subTest(input_line=inp):
self.assertEqual(exp+blank, self.communicateFlush(inp+blank))
self.proc.communicate() # let it terminate
self.proc.stdin.close()
self.proc.stdout.close()
self.proc.stderr.close()
retCode = self.proc.poll()
if self.expectedRetCodeFail:
self.assertNotEqual(0, retCode)
else:
self.assertEqual(0, retCode)
|