1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
|
from __future__ import unicode_literals
import textwrap
import markdown
import os
import logging
import collections
import unittest
from functools import wraps
try:
# py>=3.2
from tempfile import TemporaryDirectory
except ImportError:
from backports.tempfile import TemporaryDirectory
from mkdocs import config
from mkdocs import utils
def dedent(text):
return textwrap.dedent(text).strip()
def get_markdown_toc(markdown_source):
""" Return TOC generated by Markdown parser from Markdown source text. """
md = markdown.Markdown(extensions=['toc'])
md.convert(markdown_source)
return md.toc
def load_config(**cfg):
""" Helper to build a simple config for testing. """
path_base = os.path.join(
os.path.abspath(os.path.dirname(__file__)), 'integration', 'minimal'
)
cfg = cfg or {}
if 'site_name' not in cfg:
cfg['site_name'] = 'Example'
if 'config_file_path' not in cfg:
cfg['config_file_path'] = os.path.join(path_base, 'mkdocs.yml')
if 'docs_dir' not in cfg:
# Point to an actual dir to avoid a 'does not exist' error on validation.
cfg['docs_dir'] = os.path.join(path_base, 'docs')
conf = config.Config(schema=config.DEFAULT_SCHEMA, config_file_path=cfg['config_file_path'])
conf.load_dict(cfg)
errors_warnings = conf.validate()
assert(errors_warnings == ([], [])), errors_warnings
return conf
def tempdir(files=None, **kw):
"""
A decorator for building a temporary directory with prepopulated files.
The temporary directory and files are created just before the wrapped function is called and are destroyed
immediately after the wrapped function returns.
The `files` keyword should be a dict of file paths as keys and strings of file content as values.
If `files` is a list, then each item is assumed to be a path of an empty file. All other
keywords are passed to `tempfile.TemporaryDirectory` to create the parent directory.
In the following example, two files are created in the temporary directory and then are destroyed when
the function exits:
@tempdir(files={
'foo.txt': 'foo content',
'bar.txt': 'bar content'
})
def example(self, tdir):
assert os.path.isfile(os.path.join(tdir, 'foo.txt'))
pth = os.path.join(tdir, 'bar.txt')
assert os.path.isfile(pth)
with io.open(pth, 'r', encoding='utf-8') as f:
assert f.read() == 'bar content'
"""
files = {f: '' for f in files} if isinstance(files, (list, tuple)) else files or {}
if 'prefix' not in kw:
kw['prefix'] = 'mkdocs_test-'
def decorator(fn):
@wraps(fn)
def wrapper(self, *args):
with TemporaryDirectory(**kw) as td:
for path, content in files.items():
pth = os.path.join(td, path)
utils.write_file(content.encode(encoding='utf-8'), pth)
return fn(self, td, *args)
return wrapper
return decorator
class PathAssertionMixin(object):
"""
Assertion methods for testing paths.
Each method accepts one or more strings, which are first joined using os.path.join.
"""
def assertPathsEqual(self, a, b, msg=None):
self.assertEqual(a.replace('\\', '/'), b.replace('\\', '/'))
def assertPathExists(self, *parts):
path = os.path.join(*parts)
if not os.path.exists(path):
msg = self._formatMessage(None, "The path '{}' does not exist".format(path))
raise self.failureException(msg)
def assertPathNotExists(self, *parts):
path = os.path.join(*parts)
if os.path.exists(path):
msg = self._formatMessage(None, "The path '{}' does exist".format(path))
raise self.failureException(msg)
def assertPathIsFile(self, *parts):
path = os.path.join(*parts)
if not os.path.isfile(path):
msg = self._formatMessage(None, "The path '{}' is not a file that exists".format(path))
raise self.failureException(msg)
def assertPathNotFile(self, *parts):
path = os.path.join(*parts)
if os.path.isfile(path):
msg = self._formatMessage(None, "The path '{}' is a file that exists".format(path))
raise self.failureException(msg)
def assertPathIsDir(self, *parts):
path = os.path.join(*parts)
if not os.path.isdir(path):
msg = self._formatMessage(None, "The path '{}' is not a directory that exists".format(path))
raise self.failureException(msg)
def assertPathNotDir(self, *parts):
path = os.path.join(*parts)
if os.path.isfile(path):
msg = self._formatMessage(None, "The path '{}' is a directory that exists".format(path))
raise self.failureException(msg)
# Backport unittest.TestCase.assertLogs for Python 2.7
# see https://github.com/python/cpython/blob/3.6/Lib/unittest/case.py
if not utils.PY3:
_LoggingWatcher = collections.namedtuple("_LoggingWatcher",
["records", "output"])
class _CapturingHandler(logging.Handler):
"""
A logging handler capturing all (raw and formatted) logging output.
"""
def __init__(self):
logging.Handler.__init__(self)
self.watcher = _LoggingWatcher([], [])
def flush(self):
pass
def emit(self, record):
self.watcher.records.append(record)
msg = self.format(record)
self.watcher.output.append(msg)
class _AssertLogsContext(object):
"""A context manager used to implement TestCase.assertLogs()."""
LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s"
def __init__(self, test_case, logger_name, level):
self.test_case = test_case
self.logger_name = logger_name
if level:
self.level = logging._levelNames.get(level, level)
else:
self.level = logging.INFO
self.msg = None
def __enter__(self):
if isinstance(self.logger_name, logging.Logger):
logger = self.logger = self.logger_name
else:
logger = self.logger = logging.getLogger(self.logger_name)
formatter = logging.Formatter(self.LOGGING_FORMAT)
handler = _CapturingHandler()
handler.setFormatter(formatter)
self.watcher = handler.watcher
self.old_handlers = logger.handlers[:]
self.old_level = logger.level
self.old_propagate = logger.propagate
logger.handlers = [handler]
logger.setLevel(self.level)
logger.propagate = False
return handler.watcher
def __exit__(self, exc_type, exc_value, tb):
self.logger.handlers = self.old_handlers
self.logger.propagate = self.old_propagate
self.logger.setLevel(self.old_level)
if exc_type is not None:
# let unexpected exceptions pass through
return False
if len(self.watcher.records) == 0:
self._raiseFailure(
"no logs of level {} or higher triggered on {}"
.format(logging.getLevelName(self.level), self.logger.name))
def _raiseFailure(self, standardMsg):
msg = self.test_case._formatMessage(self.msg, standardMsg)
raise self.test_case.failureException(msg)
class LogTestCase(unittest.TestCase):
def assertLogs(self, logger=None, level=None):
"""Fail unless a log message of level *level* or higher is emitted
on *logger_name* or its children. If omitted, *level* defaults to
INFO and *logger* defaults to the root logger.
This method must be used as a context manager, and will yield
a recording object with two attributes: `output` and `records`.
At the end of the context manager, the `output` attribute will
be a list of the matching formatted log messages and the
`records` attribute will be a list of the corresponding LogRecord
objects.
Example::
with self.assertLogs('foo', level='INFO') as cm:
logging.getLogger('foo').info('first message')
logging.getLogger('foo.bar').error('second message')
self.assertEqual(cm.output, ['INFO:foo:first message',
'ERROR:foo.bar:second message'])
"""
return _AssertLogsContext(self, logger, level)
else:
LogTestCase = unittest.TestCase
|