1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
|
"""Logtest, a unittest.TestCase helper for testing log output."""
import sys
import time
from uuid import UUID
import pytest
from cherrypy._cpcompat import text_or_bytes
try:
# On Windows, msvcrt.getch reads a single char without output.
import msvcrt
def getchar():
return msvcrt.getch()
except ImportError:
# Unix getchr
import tty
import termios
def getchar():
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
class LogCase(object):
"""unittest.TestCase mixin for testing log messages.
logfile: a filename for the desired log. Yes, I know modes are evil,
but it makes the test functions so much cleaner to set this once.
lastmarker: the last marker in the log. This can be used to search for
messages since the last marker.
markerPrefix: a string with which to prefix log markers. This should be
unique enough from normal log output to use for marker identification.
"""
interactive = False
logfile = None
lastmarker = None
markerPrefix = b'test suite marker: '
def _handleLogError(self, msg, data, marker, pattern):
print('')
print(' ERROR: %s' % msg)
if not self.interactive:
raise pytest.fail(msg)
p = (' Show: '
'[L]og [M]arker [P]attern; '
'[I]gnore, [R]aise, or sys.e[X]it >> ')
sys.stdout.write(p + ' ')
# ARGH
sys.stdout.flush()
while True:
i = getchar().upper()
if i not in 'MPLIRX':
continue
print(i.upper()) # Also prints new line
if i == 'L':
for x, line in enumerate(data):
if (x + 1) % self.console_height == 0:
# The \r and comma should make the next line overwrite
sys.stdout.write('<-- More -->\r ')
m = getchar().lower()
# Erase our "More" prompt
sys.stdout.write(' \r ')
if m == 'q':
break
print(line.rstrip())
elif i == 'M':
print(repr(marker or self.lastmarker))
elif i == 'P':
print(repr(pattern))
elif i == 'I':
# return without raising the normal exception
return
elif i == 'R':
raise pytest.fail(msg)
elif i == 'X':
self.exit()
sys.stdout.write(p + ' ')
def exit(self):
sys.exit()
def emptyLog(self):
"""Overwrite self.logfile with 0 bytes."""
with open(self.logfile, 'wb') as f:
f.write('')
def markLog(self, key=None):
"""Insert a marker line into the log and set self.lastmarker."""
if key is None:
key = str(time.time())
self.lastmarker = key
with open(self.logfile, 'ab+') as f:
f.write(
b'%s%s\n'
% (self.markerPrefix, key.encode('utf-8'))
)
def _read_marked_region(self, marker=None):
"""Return lines from self.logfile in the marked region.
If marker is None, self.lastmarker is used. If the log hasn't
been marked (using self.markLog), the entire log will be
returned.
"""
# Give the logger time to finish writing?
# time.sleep(0.5)
logfile = self.logfile
marker = marker or self.lastmarker
if marker is None:
with open(logfile, 'rb') as f:
return f.readlines()
if isinstance(marker, str):
marker = marker.encode('utf-8')
data = []
in_region = False
with open(logfile, 'rb') as f:
for line in f:
if in_region:
if (line.startswith(self.markerPrefix)
and marker not in line):
break
else:
data.append(line)
elif marker in line:
in_region = True
return data
def assertInLog(self, line, marker=None):
"""Fail if the given (partial) line is not in the log.
The log will be searched from the given marker to the next
marker. If marker is None, self.lastmarker is used. If the log
hasn't been marked (using self.markLog), the entire log will be
searched.
"""
data = self._read_marked_region(marker)
for logline in data:
if line in logline:
return
msg = '%r not found in log' % line
self._handleLogError(msg, data, marker, line)
def assertNotInLog(self, line, marker=None):
"""Fail if the given (partial) line is in the log.
The log will be searched from the given marker to the next
marker. If marker is None, self.lastmarker is used. If the log
hasn't been marked (using self.markLog), the entire log will be
searched.
"""
data = self._read_marked_region(marker)
for logline in data:
if line in logline:
msg = '%r found in log' % line
self._handleLogError(msg, data, marker, line)
def assertValidUUIDv4(self, marker=None):
"""Fail if the given UUIDv4 is not valid.
The log will be searched from the given marker to the next
marker. If marker is None, self.lastmarker is used. If the log
hasn't been marked (using self.markLog), the entire log will be
searched.
"""
data = self._read_marked_region(marker)
data = [
chunk.decode('utf-8').rstrip('\n').rstrip('\r')
for chunk in data
]
for log_chunk in data:
try:
uuid_log = data[-1]
uuid_obj = UUID(uuid_log, version=4)
except (TypeError, ValueError):
pass # it might be in other chunk
else:
if str(uuid_obj) == uuid_log:
return
msg = '%r is not a valid UUIDv4' % uuid_log
self._handleLogError(msg, data, marker, log_chunk)
msg = 'UUIDv4 not found in log'
self._handleLogError(msg, data, marker, log_chunk)
def assertLog(self, sliceargs, lines, marker=None):
"""Fail if log.readlines()[sliceargs] is not contained in 'lines'.
The log will be searched from the given marker to the next
marker. If marker is None, self.lastmarker is used. If the log
hasn't been marked (using self.markLog), the entire log will be
searched.
"""
data = self._read_marked_region(marker)
if isinstance(sliceargs, int):
# Single arg. Use __getitem__ and allow lines to be str or list.
if isinstance(lines, (tuple, list)):
lines = lines[0]
if isinstance(lines, str):
lines = lines.encode('utf-8')
if lines not in data[sliceargs]:
msg = '%r not found on log line %r' % (lines, sliceargs)
self._handleLogError(
msg,
[data[sliceargs], '--EXTRA CONTEXT--'] + data[
sliceargs + 1:sliceargs + 6],
marker,
lines)
else:
# Multiple args. Use __getslice__ and require lines to be list.
if isinstance(lines, tuple):
lines = list(lines)
elif isinstance(lines, text_or_bytes):
raise TypeError("The 'lines' arg must be a list when "
"'sliceargs' is a tuple.")
start, stop = sliceargs
for line, logline in zip(lines, data[start:stop]):
if isinstance(line, str):
line = line.encode('utf-8')
if line not in logline:
msg = '%r not found in log' % line
self._handleLogError(msg, data[start:stop], marker, line)
|