from __future__ import unicode_literals
import textwrap
import markdown
import os
import logging
import collections
import unittest
from functools import wraps

try:
    # py>=3.2
    from tempfile import TemporaryDirectory
except ImportError:
    from backports.tempfile import TemporaryDirectory


from mkdocs import config
from mkdocs import utils


def dedent(text):
    return textwrap.dedent(text).strip()


def get_markdown_toc(markdown_source):
    """ Return TOC generated by Markdown parser from Markdown source text. """
    md = markdown.Markdown(extensions=['toc'])
    md.convert(markdown_source)
    return md.toc


def load_config(**cfg):
    """ Helper to build a simple config for testing. """
    path_base = os.path.join(
        os.path.abspath(os.path.dirname(__file__)), 'integration', 'minimal'
    )
    cfg = cfg or {}
    if 'site_name' not in cfg:
        cfg['site_name'] = 'Example'
    if 'config_file_path' not in cfg:
        cfg['config_file_path'] = os.path.join(path_base, 'mkdocs.yml')
    if 'docs_dir' not in cfg:
        # Point to an actual dir to avoid a 'does not exist' error on validation.
        cfg['docs_dir'] = os.path.join(path_base, 'docs')
    conf = config.Config(schema=config.DEFAULT_SCHEMA, config_file_path=cfg['config_file_path'])
    conf.load_dict(cfg)

    errors_warnings = conf.validate()
    assert(errors_warnings == ([], [])), errors_warnings
    return conf


def tempdir(files=None, **kw):
    """
    A decorator for building a temporary directory with prepopulated files.

    The temporary directory and files are created just before the wrapped function is called and are destroyed
    immediately after the wrapped function returns.

    The `files` keyword should be a dict of file paths as keys and strings of file content as values.
    If `files` is a list, then each item is assumed to be a path of an empty file. All other
    keywords are passed to `tempfile.TemporaryDirectory` to create the parent directory.

    In the following example, two files are created in the temporary directory and then are destroyed when
    the function exits:

        @tempdir(files={
            'foo.txt': 'foo content',
            'bar.txt': 'bar content'
        })
        def example(self, tdir):
            assert os.path.isfile(os.path.join(tdir, 'foo.txt'))
            pth = os.path.join(tdir, 'bar.txt')
            assert os.path.isfile(pth)
            with io.open(pth, 'r', encoding='utf-8') as f:
                assert f.read() == 'bar content'
    """
    files = {f: '' for f in files} if isinstance(files, (list, tuple)) else files or {}

    if 'prefix' not in kw:
        kw['prefix'] = 'mkdocs_test-'

    def decorator(fn):
        @wraps(fn)
        def wrapper(self, *args):
            with TemporaryDirectory(**kw) as td:
                for path, content in files.items():
                    pth = os.path.join(td, path)
                    utils.write_file(content.encode(encoding='utf-8'), pth)
                return fn(self, td, *args)
        return wrapper
    return decorator


class PathAssertionMixin(object):
    """
    Assertion methods for testing paths.

    Each method accepts one or more strings, which are first joined using os.path.join.
    """

    def assertPathsEqual(self, a, b, msg=None):
        self.assertEqual(a.replace('\\', '/'), b.replace('\\', '/'))

    def assertPathExists(self, *parts):
        path = os.path.join(*parts)
        if not os.path.exists(path):
            msg = self._formatMessage(None, "The path '{}' does not exist".format(path))
            raise self.failureException(msg)

    def assertPathNotExists(self, *parts):
        path = os.path.join(*parts)
        if os.path.exists(path):
            msg = self._formatMessage(None, "The path '{}' does exist".format(path))
            raise self.failureException(msg)

    def assertPathIsFile(self, *parts):
        path = os.path.join(*parts)
        if not os.path.isfile(path):
            msg = self._formatMessage(None, "The path '{}' is not a file that exists".format(path))
            raise self.failureException(msg)

    def assertPathNotFile(self, *parts):
        path = os.path.join(*parts)
        if os.path.isfile(path):
            msg = self._formatMessage(None, "The path '{}' is a file that exists".format(path))
            raise self.failureException(msg)

    def assertPathIsDir(self, *parts):
        path = os.path.join(*parts)
        if not os.path.isdir(path):
            msg = self._formatMessage(None, "The path '{}' is not a directory that exists".format(path))
            raise self.failureException(msg)

    def assertPathNotDir(self, *parts):
        path = os.path.join(*parts)
        if os.path.isfile(path):
            msg = self._formatMessage(None, "The path '{}' is a directory that exists".format(path))
            raise self.failureException(msg)


# Backport unittest.TestCase.assertLogs for Python 2.7
# see https://github.com/python/cpython/blob/3.6/Lib/unittest/case.py

if not utils.PY3:
    _LoggingWatcher = collections.namedtuple("_LoggingWatcher",
                                             ["records", "output"])

    class _CapturingHandler(logging.Handler):
        """
        A logging handler capturing all (raw and formatted) logging output.
        """

        def __init__(self):
            logging.Handler.__init__(self)
            self.watcher = _LoggingWatcher([], [])

        def flush(self):
            pass

        def emit(self, record):
            self.watcher.records.append(record)
            msg = self.format(record)
            self.watcher.output.append(msg)

    class _AssertLogsContext(object):
        """A context manager used to implement TestCase.assertLogs()."""

        LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s"

        def __init__(self, test_case, logger_name, level):
            self.test_case = test_case
            self.logger_name = logger_name
            if level:
                self.level = logging._levelNames.get(level, level)
            else:
                self.level = logging.INFO
            self.msg = None

        def __enter__(self):
            if isinstance(self.logger_name, logging.Logger):
                logger = self.logger = self.logger_name
            else:
                logger = self.logger = logging.getLogger(self.logger_name)
            formatter = logging.Formatter(self.LOGGING_FORMAT)
            handler = _CapturingHandler()
            handler.setFormatter(formatter)
            self.watcher = handler.watcher
            self.old_handlers = logger.handlers[:]
            self.old_level = logger.level
            self.old_propagate = logger.propagate
            logger.handlers = [handler]
            logger.setLevel(self.level)
            logger.propagate = False
            return handler.watcher

        def __exit__(self, exc_type, exc_value, tb):
            self.logger.handlers = self.old_handlers
            self.logger.propagate = self.old_propagate
            self.logger.setLevel(self.old_level)
            if exc_type is not None:
                # let unexpected exceptions pass through
                return False
            if len(self.watcher.records) == 0:
                self._raiseFailure(
                    "no logs of level {} or higher triggered on {}"
                    .format(logging.getLevelName(self.level), self.logger.name))

        def _raiseFailure(self, standardMsg):
            msg = self.test_case._formatMessage(self.msg, standardMsg)
            raise self.test_case.failureException(msg)

    class LogTestCase(unittest.TestCase):
        def assertLogs(self, logger=None, level=None):
            """Fail unless a log message of level *level* or higher is emitted
            on *logger_name* or its children.  If omitted, *level* defaults to
            INFO and *logger* defaults to the root logger.
            This method must be used as a context manager, and will yield
            a recording object with two attributes: `output` and `records`.
            At the end of the context manager, the `output` attribute will
            be a list of the matching formatted log messages and the
            `records` attribute will be a list of the corresponding LogRecord
            objects.
            Example::
                with self.assertLogs('foo', level='INFO') as cm:
                    logging.getLogger('foo').info('first message')
                    logging.getLogger('foo.bar').error('second message')
                self.assertEqual(cm.output, ['INFO:foo:first message',
                                             'ERROR:foo.bar:second message'])
            """
            return _AssertLogsContext(self, logger, level)
else:
    LogTestCase = unittest.TestCase
