File: assert_logs_backport.py

package info (click to toggle)
python-motor 3.7.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,572 kB
  • sloc: python: 12,252; javascript: 137; makefile: 74; sh: 8
file content (81 lines) | stat: -rw-r--r-- 2,623 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""Backport assertLogs from Python 3.4."""

import collections
import logging

# mypy: ignore-errors

_LoggingWatcher = collections.namedtuple("_LoggingWatcher", ["records", "output"])


class _BaseTestCaseContext:
    def __init__(self, test_case):
        self.test_case = test_case

    def _raiseFailure(self, standardMsg):
        msg = self.test_case._formatMessage(self.msg, standardMsg)
        raise self.test_case.failureException(msg)


class _CapturingHandler(logging.Handler):
    """Handler capturing all (raw and formatted) logging output."""

    def __init__(self):
        logging.Handler.__init__(self)
        self.watcher = _LoggingWatcher([], [])

    def flush(self):
        pass

    def emit(self, record):
        self.watcher.records.append(record)
        msg = self.format(record)
        self.watcher.output.append(msg)


class _AssertLogsContext(_BaseTestCaseContext):
    """A context manager used to implement TestCase.assertLogs()."""

    LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s"

    def __init__(self, test_case, logger_name, level):
        assert isinstance(level, int)
        _BaseTestCaseContext.__init__(self, test_case)
        self.logger_name = logger_name
        self.level = level or logging.INFO
        self.level_name = logging.getLevelName(level)
        self.msg = None

    def __enter__(self):
        if isinstance(self.logger_name, logging.Logger):
            logger = self.logger = self.logger_name
        else:
            logger = self.logger = logging.getLogger(self.logger_name)
        formatter = logging.Formatter(self.LOGGING_FORMAT)
        handler = _CapturingHandler()
        handler.setFormatter(formatter)
        self.watcher = handler.watcher
        self.old_handlers = logger.handlers[:]
        self.old_level = logger.level
        self.old_propagate = logger.propagate
        logger.handlers = [handler]
        logger.setLevel(self.level)
        logger.propagate = False
        return handler.watcher

    def __exit__(self, exc_type, exc_value, tb):
        self.logger.handlers = self.old_handlers
        self.logger.propagate = self.old_propagate
        self.logger.setLevel(self.old_level)
        if exc_type is not None:
            # let unexpected exceptions pass through
            return False
        if len(self.watcher.records) == 0:
            self._raiseFailure(
                f"no logs of level {self.level_name} or higher triggered on {self.logger.name}"
            )


class AssertLogsMixin:
    def assertLogs(self, logger=None, level=None):
        return _AssertLogsContext(self, logger, level)