File: pytest_checklogs.py

package info (click to toggle)
python-dugong 3.7%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 516 kB
  • ctags: 346
  • sloc: python: 2,672; makefile: 23; sh: 1
file content (130 lines) | stat: -rw-r--r-- 4,235 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#!/usr/bin/env python3
'''
py.test plugin to look for suspicious phrases in messages
emitted on stdout/stderr or via the logging module.

False positives can be registered via a new `reg_output`
fixture (for messages to stdout/stderr), and a `assert_logs`
function (for logging messages).
'''

import pytest
import re
import functools
import sys
import logging
from contextlib import contextmanager

def pytest_configure(config):
    if not config.pluginmanager.hasplugin('pytest_catchlog'):
        raise ImportError('pytest catchlog plugin not found')

# Fail tests if they result in log messages of severity WARNING or more.
def check_test_log(caplog):
    for record in caplog.records:
        if (record.levelno >= logging.WARNING and
            not getattr(record, 'checklogs_ignore', False)):
            raise AssertionError('Logger received warning messages')

class CountMessagesHandler(logging.Handler):
    def __init__(self, level=logging.NOTSET):
        super().__init__(level)
        self.count = 0

    def emit(self, record):
        self.count += 1

@contextmanager
def assert_logs(pattern, level=logging.WARNING, count=None):
    '''Assert that suite emits specified log message

    *pattern* is matched against the *unformatted* log message, i.e. before any
    arguments are merged.

    If *count* is not None, raise an exception unless exactly *count* matching
    messages are caught.

    Matched log records will also be flagged so that the caplog fixture
    does not generate exceptions for them (no matter their severity).
    '''

    def filter(record):
        if (record.levelno == level and
            re.search(pattern, record.msg)):
            record.checklogs_ignore = True
            return True
        return False

    handler = CountMessagesHandler()
    handler.setLevel(level)
    handler.addFilter(filter)
    logger = logging.getLogger()
    logger.addHandler(handler)
    try:
        yield

    finally:
        logger.removeHandler(handler)

        if count is not None and handler.count != count:
            raise AssertionError('Expected to catch %d %r messages, but got only %d'
                                 % (count, pattern, handler.count))

def check_test_output(capfd, item):
    (stdout, stderr) = capfd.readouterr()

    # Write back what we've read (so that it will still be printed)
    sys.stdout.write(stdout)
    sys.stderr.write(stderr)

    # Strip out false positives
    try:
        false_pos = item.checklogs_fp
    except AttributeError:
        false_pos = ()
    for (pattern, flags, count) in false_pos:
        cp = re.compile(pattern, flags)
        (stdout, cnt) = cp.subn('', stdout, count=count)
        if count == 0 or count - cnt > 0:
            stderr = cp.sub('', stderr, count=count - cnt)

    for pattern in ('exception', 'error', 'warning', 'fatal', 'traceback',
                    'fault', 'crash(?:ed)?', 'abort(?:ed)', 'fishy'):
        cp = re.compile(r'\b{}\b'.format(pattern), re.IGNORECASE | re.MULTILINE)
        hit = cp.search(stderr)
        if hit:
            raise AssertionError('Suspicious output to stderr (matched "%s")' % hit.group(0))
        hit = cp.search(stdout)
        if hit:
            raise AssertionError('Suspicious output to stdout (matched "%s")' % hit.group(0))

def register_output(item, pattern, count=1, flags=re.MULTILINE):
    '''Register *pattern* as false positive for output checking

    This prevents the test from failing because the output otherwise
    appears suspicious.
    '''

    item.checklogs_fp.append((pattern, flags, count))

@pytest.fixture()
def reg_output(request):
    assert not hasattr(request.node, 'checklogs_fp')
    request.node.checklogs_fp = []
    return functools.partial(register_output, request.node)

def check_output(item):
    pm = item.config.pluginmanager
    cm = pm.getplugin('capturemanager')
    check_test_output(cm._capturing, item)
    check_test_log(item.catch_log_handler)

@pytest.hookimpl(trylast=True)
def pytest_runtest_setup(item):
    check_output(item)
@pytest.hookimpl(trylast=True)
def pytest_runtest_call(item):
    check_output(item)
@pytest.hookimpl(trylast=True)
def pytest_runtest_teardown(item, nextitem):
    check_output(item)