# Copyright (c) 2008-2012 testtools developers. See LICENSE for details.

import io
import os
import tempfile
import unittest

from testtools import TestCase
from testtools.compat import (
    _b,
)
from testtools.content import (
    attach_file,
    Content,
    content_from_file,
    content_from_stream,
    JSON,
    json_content,
    StackLinesContent,
    StacktraceContent,
    TracebackContent,
    text_content,
)
from testtools.content_type import (
    ContentType,
    UTF8_TEXT,
)
from testtools.matchers import (
    Equals,
    MatchesException,
    Raises,
    raises,
)
from testtools.tests.helpers import an_exc_info


raises_value_error = Raises(MatchesException(ValueError))


class TestContent(TestCase):
    def test___init___None_errors(self):
        self.assertThat(lambda: Content(None, None), raises_value_error)
        self.assertThat(
            lambda: Content(None, lambda: ["traceback"]), raises_value_error
        )
        self.assertThat(
            lambda: Content(ContentType("text", "traceback"), None), raises_value_error
        )

    def test___init___sets_ivars(self):
        content_type = ContentType("foo", "bar")
        content = Content(content_type, lambda: ["bytes"])
        self.assertEqual(content_type, content.content_type)
        self.assertEqual(["bytes"], list(content.iter_bytes()))

    def test___eq__(self):
        content_type = ContentType("foo", "bar")

        def one_chunk():
            return [_b("bytes")]

        def two_chunk():
            return [_b("by"), _b("tes")]

        content1 = Content(content_type, one_chunk)
        content2 = Content(content_type, one_chunk)
        content3 = Content(content_type, two_chunk)
        content4 = Content(content_type, lambda: [_b("by"), _b("te")])
        content5 = Content(ContentType("f", "b"), two_chunk)
        self.assertEqual(content1, content2)
        self.assertEqual(content1, content3)
        self.assertNotEqual(content1, content4)
        self.assertNotEqual(content1, content5)

    def test___repr__(self):
        content = Content(
            ContentType("application", "octet-stream"),
            lambda: [_b("\x00bin"), _b("ary\xff")],
        )
        self.assertIn("\\x00binary\\xff", repr(content))

    def test_iter_text_not_text_errors(self):
        content_type = ContentType("foo", "bar")
        content = Content(content_type, lambda: ["bytes"])
        self.assertThat(content.iter_text, raises_value_error)

    def test_iter_text_decodes(self):
        content_type = ContentType("text", "strange", {"charset": "utf8"})
        content = Content(content_type, lambda: ["bytes\xea".encode()])
        self.assertEqual(["bytes\xea"], list(content.iter_text()))

    def test_iter_text_default_charset_iso_8859_1(self):
        content_type = ContentType("text", "strange")
        text = "bytes\xea"
        iso_version = text.encode("ISO-8859-1")
        content = Content(content_type, lambda: [iso_version])
        self.assertEqual([text], list(content.iter_text()))

    def test_as_text(self):
        content_type = ContentType("text", "strange", {"charset": "utf8"})
        content = Content(content_type, lambda: ["bytes\xea".encode()])
        self.assertEqual("bytes\xea", content.as_text())

    def test_from_file(self):
        fd, path = tempfile.mkstemp()
        self.addCleanup(os.remove, path)
        os.write(fd, _b("some data"))
        os.close(fd)
        content = content_from_file(path, UTF8_TEXT, chunk_size=2)
        self.assertThat(
            list(content.iter_bytes()),
            Equals([_b("so"), _b("me"), _b(" d"), _b("at"), _b("a")]),
        )

    def test_from_nonexistent_file(self):
        directory = tempfile.mkdtemp()
        nonexistent = os.path.join(directory, "nonexistent-file")
        content = content_from_file(nonexistent)
        self.assertThat(content.iter_bytes, raises(IOError))

    def test_from_file_default_type(self):
        content = content_from_file("/nonexistent/path")
        self.assertThat(content.content_type, Equals(UTF8_TEXT))

    def test_from_file_eager_loading(self):
        fd, path = tempfile.mkstemp()
        os.write(fd, _b("some data"))
        os.close(fd)
        content = content_from_file(path, UTF8_TEXT, buffer_now=True)
        os.remove(path)
        self.assertThat("".join(content.iter_text()), Equals("some data"))

    def test_from_file_with_simple_seek(self):
        f = tempfile.NamedTemporaryFile()
        f.write(_b("some data"))
        f.flush()
        self.addCleanup(f.close)
        content = content_from_file(f.name, UTF8_TEXT, chunk_size=50, seek_offset=5)
        self.assertThat(list(content.iter_bytes()), Equals([_b("data")]))

    def test_from_file_with_whence_seek(self):
        f = tempfile.NamedTemporaryFile()
        f.write(_b("some data"))
        f.flush()
        self.addCleanup(f.close)
        content = content_from_file(
            f.name, UTF8_TEXT, chunk_size=50, seek_offset=-4, seek_whence=2
        )
        self.assertThat(list(content.iter_bytes()), Equals([_b("data")]))

    def test_from_stream(self):
        data = io.StringIO("some data")
        content = content_from_stream(data, UTF8_TEXT, chunk_size=2)
        self.assertThat(
            list(content.iter_bytes()), Equals(["so", "me", " d", "at", "a"])
        )

    def test_from_stream_default_type(self):
        data = io.StringIO("some data")
        content = content_from_stream(data)
        self.assertThat(content.content_type, Equals(UTF8_TEXT))

    def test_from_stream_eager_loading(self):
        fd, path = tempfile.mkstemp()
        self.addCleanup(os.remove, path)
        self.addCleanup(os.close, fd)
        os.write(fd, _b("some data"))
        stream = open(path, "rb")
        self.addCleanup(stream.close)
        content = content_from_stream(stream, UTF8_TEXT, buffer_now=True)
        os.write(fd, _b("more data"))
        self.assertThat("".join(content.iter_text()), Equals("some data"))

    def test_from_stream_with_simple_seek(self):
        data = io.BytesIO(_b("some data"))
        content = content_from_stream(data, UTF8_TEXT, chunk_size=50, seek_offset=5)
        self.assertThat(list(content.iter_bytes()), Equals([_b("data")]))

    def test_from_stream_with_whence_seek(self):
        data = io.BytesIO(_b("some data"))
        content = content_from_stream(
            data, UTF8_TEXT, chunk_size=50, seek_offset=-4, seek_whence=2
        )
        self.assertThat(list(content.iter_bytes()), Equals([_b("data")]))

    def test_from_text(self):
        data = "some data"
        expected = Content(UTF8_TEXT, lambda: [data.encode("utf8")])
        self.assertEqual(expected, text_content(data))

    def test_text_content_raises_TypeError_when_passed_bytes(self):
        data = _b("Some Bytes")
        self.assertRaises(TypeError, text_content, data)

    def test_text_content_raises_TypeError_when_passed_non_text(self):
        bad_values = (None, list(), dict(), 42, 1.23)
        for value in bad_values:
            self.assertThat(
                lambda: text_content(value),
                raises(
                    TypeError(
                        "text_content must be given text, not '%s'."
                        % type(value).__name__
                    )
                ),
            )

    def test_json_content(self):
        data = {"foo": "bar"}
        expected = Content(JSON, lambda: [_b('{"foo": "bar"}')])
        self.assertEqual(expected, json_content(data))


class TestStackLinesContent(TestCase):
    def _get_stack_line_and_expected_output(self):
        stack_lines = [
            ("/path/to/file", 42, "some_function", 'print("Hello World")'),
        ]
        expected = (
            '  File "/path/to/file", line 42, in some_function\n'
            '    print("Hello World")\n'
        )
        return stack_lines, expected

    def test_single_stack_line(self):
        stack_lines, expected = self._get_stack_line_and_expected_output()
        actual = StackLinesContent(stack_lines).as_text()

        self.assertEqual(expected, actual)

    def test_prefix_content(self):
        stack_lines, expected = self._get_stack_line_and_expected_output()
        prefix = self.getUniqueString() + "\n"
        content = StackLinesContent(stack_lines, prefix_content=prefix)
        actual = content.as_text()
        expected = prefix + expected

        self.assertEqual(expected, actual)

    def test_postfix_content(self):
        stack_lines, expected = self._get_stack_line_and_expected_output()
        postfix = "\n" + self.getUniqueString()
        content = StackLinesContent(stack_lines, postfix_content=postfix)
        actual = content.as_text()
        expected = expected + postfix

        self.assertEqual(expected, actual)

    def test___init___sets_content_type(self):
        stack_lines, expected = self._get_stack_line_and_expected_output()
        content = StackLinesContent(stack_lines)
        expected_content_type = ContentType(
            "text", "x-traceback", {"language": "python", "charset": "utf8"}
        )

        self.assertEqual(expected_content_type, content.content_type)


class TestTracebackContent(TestCase):
    def test___init___None_errors(self):
        self.assertThat(lambda: TracebackContent(None, None), raises_value_error)

    def test___init___sets_ivars(self):
        content = TracebackContent(an_exc_info, self)
        content_type = ContentType(
            "text", "x-traceback", {"language": "python", "charset": "utf8"}
        )
        self.assertEqual(content_type, content.content_type)
        result = unittest.TestResult()
        expected = result._exc_info_to_string(an_exc_info, self)
        self.assertEqual(expected, "".join(list(content.iter_text())))


class TestStacktraceContent(TestCase):
    def test___init___sets_ivars(self):
        content = StacktraceContent()
        content_type = ContentType(
            "text", "x-traceback", {"language": "python", "charset": "utf8"}
        )
        self.assertEqual(content_type, content.content_type)

    def test_prefix_is_used(self):
        prefix = self.getUniqueString()
        actual = StacktraceContent(prefix_content=prefix).as_text()
        self.assertTrue(actual.startswith(prefix))

    def test_postfix_is_used(self):
        postfix = self.getUniqueString()
        actual = StacktraceContent(postfix_content=postfix).as_text()
        self.assertTrue(actual.endswith(postfix))

    def test_top_frame_is_skipped_when_no_stack_is_specified(self):
        actual = StacktraceContent().as_text()
        self.assertNotIn("testtools/content.py", actual)


class TestAttachFile(TestCase):
    def make_file(self, data):
        # GZ 2011-04-21: This helper could be useful for methods above trying
        #                to use mkstemp, but should handle write failures and
        #                always close the fd. There must be a better way.
        fd, path = tempfile.mkstemp()
        self.addCleanup(os.remove, path)
        os.write(fd, _b(data))
        os.close(fd)
        return path

    def test_simple(self):
        class SomeTest(TestCase):
            def test_foo(self):
                pass

        test = SomeTest("test_foo")
        data = "some data"
        path = self.make_file(data)
        my_content = text_content(data)
        attach_file(test, path, name="foo")
        self.assertEqual({"foo": my_content}, test.getDetails())

    def test_optional_name(self):
        # If no name is provided, attach_file just uses the base name of the
        # file.
        class SomeTest(TestCase):
            def test_foo(self):
                pass

        test = SomeTest("test_foo")
        path = self.make_file("some data")
        base_path = os.path.basename(path)
        attach_file(test, path)
        self.assertEqual([base_path], list(test.getDetails()))

    def test_lazy_read(self):
        class SomeTest(TestCase):
            def test_foo(self):
                pass

        test = SomeTest("test_foo")
        path = self.make_file("some data")
        attach_file(test, path, name="foo", buffer_now=False)
        content = test.getDetails()["foo"]
        content_file = open(path, "w")
        content_file.write("new data")
        content_file.close()
        self.assertEqual("".join(content.iter_text()), "new data")

    def test_eager_read_by_default(self):
        class SomeTest(TestCase):
            def test_foo(self):
                pass

        test = SomeTest("test_foo")
        path = self.make_file("some data")
        attach_file(test, path, name="foo")
        content = test.getDetails()["foo"]
        content_file = open(path, "w")
        content_file.write("new data")
        content_file.close()
        self.assertEqual("".join(content.iter_text()), "some data")


def test_suite():
    from unittest import TestLoader

    return TestLoader().loadTestsFromName(__name__)
