1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
|
#! /usr/bin/env python3
# Copyright © 2020, RedHat Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
# Authors:
# Benjamin Berg <bberg@redhat.com>
import os
import sys
import fcntl
import io
import re
import time
import threading
import select
import errno
class OutputChecker(object):
def __init__(self, out=sys.stdout):
self._output = out
self._pipe_fd_r, self._pipe_fd_w = os.pipe()
self._eof = False
self._partial_buf = b''
self._lines_sem = threading.Semaphore()
self._lines = []
self._reader_io = io.StringIO()
# Just to be sure, shouldn't be a problem even if we didn't set it
fcntl.fcntl(self._pipe_fd_r, fcntl.F_SETFL,
fcntl.fcntl(self._pipe_fd_r, fcntl.F_GETFL) | os.O_CLOEXEC | os.O_NONBLOCK)
fcntl.fcntl(self._pipe_fd_w, fcntl.F_SETFL,
fcntl.fcntl(self._pipe_fd_w, fcntl.F_GETFL) | os.O_CLOEXEC)
# Start copier thread
self._thread = threading.Thread(target=self._copy, daemon=True)
self._thread.start()
def _copy(self):
p = select.poll()
p.register(self._pipe_fd_r)
while True:
try:
# Be lazy and wake up occasionally in case _pipe_fd_r became invalid
# The reason to do this is because os.read() will *not* return if the
# FD is forcefully closed.
p.poll(0.1)
r = os.read(self._pipe_fd_r, 1024)
if not r:
self._eof = True
self._lines_sem.release()
return
except OSError as e:
if e.errno == errno.EWOULDBLOCK:
continue
# We get a bad file descriptor error when the outside closes the FD
self._lines_sem.release()
return
l = r.split(b'\n')
l[0] = self._partial_buf + l[0]
self._lines.extend(l[:-1])
self._partial_buf = l[-1]
self._lines_sem.release()
os.write(self._output.fileno(), r)
def check_line_re(self, needle_re, timeout=0, failmsg=None):
deadline = time.time() + timeout
if isinstance(needle_re, str):
needle_re = needle_re.encode('ascii')
r = re.compile(needle_re)
ret = []
while True:
try:
l = self._lines.pop(0)
except IndexError:
# EOF, throw error
if self._eof:
if failmsg:
raise AssertionError("No further messages: " % failmsg)
else:
raise AssertionError('No client waiting for needle %s' % (str(needle_re)))
# Check if should wake up
if not self._lines_sem.acquire(timeout = deadline - time.time()):
if failmsg:
raise AssertionError(failmsg)
else:
raise AssertionError('Timed out waiting for needle %s (timeout: %0.2f)' % (str(needle_re), timeout))
continue
ret.append(l)
if r.search(l):
return ret
def check_line(self, needle, timeout=0, failmsg=None):
if isinstance(needle, str):
needle = needle.encode('ascii')
needle_re = re.escape(needle)
return self.check_line_re(needle_re, timeout=timeout, failmsg=failmsg)
def check_no_line_re(self, needle_re, wait=0, failmsg=None):
deadline = time.time() + wait
if isinstance(needle_re, str):
needle_re = needle_re.encode('ascii')
r = re.compile(needle_re)
ret = []
while True:
try:
l = self._lines.pop(0)
except IndexError:
# EOF, so everything good
if self._pipe_fd_r == -1:
break
# Check if should wake up
if not self._lines_sem.acquire(timeout = deadline - time.time()):
# Timed out, so everything is good
break
continue
ret.append(l)
if r.search(l):
if failmsg:
raise AssertionError(failmsg)
else:
raise AssertionError('Found needle %s but shouldn\'t have been there (timeout: %0.2f)' % (str(needle_re), wait))
return ret
def check_no_line(self, needle, wait=0, failmsg=None):
if isinstance(needle, str):
needle = needle.encode('ascii')
needle_re = re.escape(needle)
return self.check_no_line_re(needle_re, wait=wait, failmsg=failmsg)
def clear(self):
ret = self._lines
self._lines = []
return ret
def assert_closed(self, timeout=1):
self._thread.join(timeout)
if self._thread.is_alive() != False:
raise AssertionError("OutputCheck: Write side has not been closed yet!")
def force_close(self):
fd = self._pipe_fd_r
self._pipe_fd_r = -1
if fd >= 0:
os.close(fd)
self._thread.join()
@property
def fd(self):
return self._pipe_fd_w
def writer_attached(self):
os.close(self._pipe_fd_w)
self._pipe_fd_w = -1
def __del__(self):
if self._pipe_fd_r >= 0:
os.close(self._pipe_fd_r)
self._pipe_fd_r = -1
if self._pipe_fd_w >= 0:
os.close(self._pipe_fd_w)
self._pipe_fd_w = -1
|