1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
import os
import re
import time
from datetime import datetime, timedelta
from io import SEEK_END
from typing import List, Tuple, Any
class HttpdErrorLog:
"""Checking the httpd error log for errors and warnings, including
limiting checks from a recent known position forward.
"""
RE_ERRLOG_WARN = re.compile(r'.*\[[^:]+:warn].*')
RE_ERRLOG_ERROR = re.compile(r'.*\[[^:]+:error].*')
RE_APLOGNO = re.compile(r'.*\[[^:]+:(error|warn)].* (?P<aplogno>AH\d+): .+')
def __init__(self, path: str):
self._path = path
self._ignored_matches = []
self._ignored_lognos = set()
# remember the file position we started with
self._start_pos = 0
if os.path.isfile(self._path):
with open(self._path) as fd:
self._start_pos = fd.seek(0, SEEK_END)
self._recent_pos = self._start_pos
self._recent_errors = []
self._recent_warnings = []
self._caught_errors = set()
self._caught_warnings = set()
self._caught_matches = set()
def __repr__(self):
return f"HttpdErrorLog[{self._path}, errors: {' '.join(self._recent_errors)}, " \
f"warnings: {' '.join(self._recent_warnings)}]"
@property
def path(self) -> str:
return self._path
def clear_log(self):
if os.path.isfile(self.path):
os.truncate(self.path, 0)
self._start_pos = self._recent_pos = 0
self._recent_errors = []
self._recent_warnings = []
self._caught_errors = set()
self._caught_warnings = set()
self._caught_matches = set()
def _lookup_matches(self, line: str, matches: List[str]) -> bool:
for m in matches:
if re.match(m, line):
return True
return False
def _lookup_lognos(self, line: str, lognos: set) -> bool:
if len(lognos) > 0:
m = self.RE_APLOGNO.match(line)
if m and m.group('aplogno') in lognos:
return True
return False
def clear_ignored_matches(self):
self._ignored_matches = []
def add_ignored_matches(self, matches: List[str]):
for m in matches:
self._ignored_matches.append(re.compile(m))
def clear_ignored_lognos(self):
self._ignored_lognos = set()
def add_ignored_lognos(self, lognos: List[str]):
for l in lognos:
self._ignored_lognos.add(l)
def _is_ignored(self, line: str) -> bool:
if self._lookup_matches(line, self._ignored_matches):
return True
if self._lookup_lognos(line, self._ignored_lognos):
return True
return False
def ignore_recent(self, lognos: List[str] = [], matches: List[str] = []):
"""After a test case triggered errors/warnings on purpose, add
those to our 'caught' list so the do not get reported as 'missed'.
"""
self._recent_errors = []
self._recent_warnings = []
if os.path.isfile(self._path):
with open(self._path) as fd:
fd.seek(self._recent_pos, os.SEEK_SET)
lognos_set = set(lognos)
for line in fd:
if self._is_ignored(line):
continue
if self._lookup_matches(line, matches):
self._caught_matches.add(line)
continue
m = self.RE_ERRLOG_WARN.match(line)
if m and self._lookup_lognos(line, lognos_set):
self._caught_warnings.add(line)
continue
m = self.RE_ERRLOG_ERROR.match(line)
if m and self._lookup_lognos(line, lognos_set):
self._caught_errors.add(line)
continue
self._recent_pos = fd.tell()
def get_missed(self) -> Tuple[List[str], List[str]]:
errors = []
warnings = []
self._recent_errors = []
self._recent_warnings = []
if os.path.isfile(self._path):
with open(self._path) as fd:
fd.seek(self._start_pos, os.SEEK_SET)
for line in fd:
if self._is_ignored(line):
continue
if line in self._caught_matches:
continue
m = self.RE_ERRLOG_WARN.match(line)
if m and line not in self._caught_warnings:
warnings.append(line)
continue
m = self.RE_ERRLOG_ERROR.match(line)
if m and line not in self._caught_errors:
errors.append(line)
continue
self._start_pos = self._recent_pos = fd.tell()
self._caught_errors = set()
self._caught_warnings = set()
self._caught_matches = set()
return errors, warnings
def scan_recent(self, pattern: re.Pattern, timeout=10):
if not os.path.isfile(self.path):
return False
with open(self.path) as fd:
end = datetime.now() + timedelta(seconds=timeout)
while True:
fd.seek(self._recent_pos, os.SEEK_SET)
for line in fd:
if pattern.match(line):
return True
if datetime.now() > end:
raise TimeoutError(f"pattern not found in error log after {timeout} seconds")
time.sleep(.1)
return False
|