import io
import os
import unittest
from test import support
from test.support import import_helper, os_helper, warnings_helper


_testcapi = import_helper.import_module('_testcapi')
_testlimitedcapi = import_helper.import_module('_testlimitedcapi')
_io = import_helper.import_module('_io')
NULL = None
STDOUT_FD = 1

with open(__file__, 'rb') as fp:
    FIRST_LINE = next(fp).decode()
FIRST_LINE_NORM = FIRST_LINE.rstrip() + '\n'


class CAPIFileTest(unittest.TestCase):
    def test_pyfile_fromfd(self):
        # Test PyFile_FromFd() which is a thin wrapper to _io.open()
        pyfile_fromfd = _testlimitedcapi.pyfile_fromfd
        filename = __file__
        with open(filename, "rb") as fp:
            fd = fp.fileno()

            # FileIO
            fp.seek(0)
            obj = pyfile_fromfd(fd, filename, "rb", 0, NULL, NULL, NULL, 0)
            try:
                self.assertIsInstance(obj, _io.FileIO)
                self.assertEqual(obj.readline(), FIRST_LINE.encode())
            finally:
                obj.close()

            # BufferedReader
            fp.seek(0)
            obj = pyfile_fromfd(fd, filename, "rb", 1024, NULL, NULL, NULL, 0)
            try:
                self.assertIsInstance(obj, _io.BufferedReader)
                self.assertEqual(obj.readline(), FIRST_LINE.encode())
            finally:
                obj.close()

            # TextIOWrapper
            fp.seek(0)
            obj = pyfile_fromfd(fd, filename, "r", 1,
                                "utf-8", "replace", NULL, 0)
            try:
                self.assertIsInstance(obj, _io.TextIOWrapper)
                self.assertEqual(obj.encoding, "utf-8")
                self.assertEqual(obj.errors, "replace")
                self.assertEqual(obj.readline(), FIRST_LINE_NORM)
            finally:
                obj.close()

    def test_pyfile_getline(self):
        # Test PyFile_GetLine(file, n): call file.readline()
        # and strip "\n" suffix if n < 0.
        pyfile_getline = _testlimitedcapi.pyfile_getline

        # Test Unicode
        with open(__file__, "r") as fp:
            fp.seek(0)
            self.assertEqual(pyfile_getline(fp, -1),
                             FIRST_LINE_NORM.rstrip('\n'))
            fp.seek(0)
            self.assertEqual(pyfile_getline(fp, 0),
                             FIRST_LINE_NORM)
            fp.seek(0)
            self.assertEqual(pyfile_getline(fp, 6),
                             FIRST_LINE_NORM[:6])

        # Test bytes
        with open(__file__, "rb") as fp:
            fp.seek(0)
            self.assertEqual(pyfile_getline(fp, -1),
                             FIRST_LINE.rstrip('\n').encode())
            fp.seek(0)
            self.assertEqual(pyfile_getline(fp, 0),
                             FIRST_LINE.encode())
            fp.seek(0)
            self.assertEqual(pyfile_getline(fp, 6),
                             FIRST_LINE.encode()[:6])

    def test_pyfile_writestring(self):
        # Test PyFile_WriteString(str, file): call file.write(str)
        writestr = _testlimitedcapi.pyfile_writestring

        with io.StringIO() as fp:
            self.assertEqual(writestr("a\xe9\u20ac\U0010FFFF".encode(), fp), 0)
            with self.assertRaises(UnicodeDecodeError):
                writestr(b"\xff", fp)
            with self.assertRaises(UnicodeDecodeError):
                writestr("\udc80".encode("utf-8", "surrogatepass"), fp)

            text = fp.getvalue()
            self.assertEqual(text, "a\xe9\u20ac\U0010FFFF")

        with self.assertRaises(SystemError):
            writestr(b"abc", NULL)

    def test_pyfile_writeobject(self):
        # Test PyFile_WriteObject(obj, file, flags):
        # - Call file.write(str(obj)) if flags equals Py_PRINT_RAW.
        # - Call file.write(repr(obj)) otherwise.
        writeobject = _testlimitedcapi.pyfile_writeobject
        Py_PRINT_RAW = 1

        with io.StringIO() as fp:
            # Test flags=Py_PRINT_RAW
            self.assertEqual(writeobject("raw", fp, Py_PRINT_RAW), 0)
            writeobject(NULL, fp, Py_PRINT_RAW)

            # Test flags=0
            self.assertEqual(writeobject("repr", fp, 0), 0)
            writeobject(NULL, fp, 0)

            text = fp.getvalue()
            self.assertEqual(text, "raw<NULL>'repr'<NULL>")

        # invalid file type
        for invalid_file in (123, "abc", object()):
            with self.subTest(file=invalid_file):
                with self.assertRaises(AttributeError):
                    writeobject("abc", invalid_file, Py_PRINT_RAW)

        with self.assertRaises(TypeError):
            writeobject("abc", NULL, 0)

    def test_pyobject_asfiledescriptor(self):
        # Test PyObject_AsFileDescriptor(obj):
        # - Return obj if obj is an integer.
        # - Return obj.fileno() otherwise.
        # File descriptor must be >= 0.
        asfd = _testlimitedcapi.pyobject_asfiledescriptor

        self.assertEqual(asfd(123), 123)
        self.assertEqual(asfd(0), 0)

        with open(__file__, "rb") as fp:
            self.assertEqual(asfd(fp), fp.fileno())

        # bool emits RuntimeWarning
        msg = r"bool is used as a file descriptor"
        with warnings_helper.check_warnings((msg, RuntimeWarning)):
            self.assertEqual(asfd(True), 1)

        class FakeFile:
            def __init__(self, fd):
                self.fd = fd
            def fileno(self):
                return self.fd

        # file descriptor must be positive
        with self.assertRaises(ValueError):
            asfd(-1)
        with self.assertRaises(ValueError):
            asfd(FakeFile(-1))

        # fileno() result must be an integer
        with self.assertRaises(TypeError):
            asfd(FakeFile("text"))

        # unsupported types
        for obj in ("string", ["list"], object()):
            with self.subTest(obj=obj):
                with self.assertRaises(TypeError):
                    asfd(obj)

        # CRASHES asfd(NULL)

    def test_pyfile_newstdprinter(self):
        # Test PyFile_NewStdPrinter()
        pyfile_newstdprinter = _testcapi.pyfile_newstdprinter

        file = pyfile_newstdprinter(STDOUT_FD)
        self.assertEqual(file.closed, False)
        self.assertIsNone(file.encoding)
        self.assertEqual(file.mode, "w")

        self.assertEqual(file.fileno(), STDOUT_FD)
        self.assertEqual(file.isatty(), os.isatty(STDOUT_FD))

        # flush() is a no-op
        self.assertIsNone(file.flush())

        # close() is a no-op
        self.assertIsNone(file.close())
        self.assertEqual(file.closed, False)

        support.check_disallow_instantiation(self, type(file))

    def test_pyfile_newstdprinter_write(self):
        # Test the write() method of PyFile_NewStdPrinter()
        pyfile_newstdprinter = _testcapi.pyfile_newstdprinter

        filename = os_helper.TESTFN
        self.addCleanup(os_helper.unlink, filename)

        try:
            old_stdout = os.dup(STDOUT_FD)
        except OSError as exc:
            # os.dup(STDOUT_FD) is not supported on WASI
            self.skipTest(f"os.dup() failed with {exc!r}")

        try:
            with open(filename, "wb") as fp:
                # PyFile_NewStdPrinter() only accepts fileno(stdout)
                # or fileno(stderr) file descriptor.
                fd = fp.fileno()
                os.dup2(fd, STDOUT_FD)

                file = pyfile_newstdprinter(STDOUT_FD)
                self.assertEqual(file.write("text"), 4)
                # The surrogate character is encoded with
                # the "surrogateescape" error handler
                self.assertEqual(file.write("[\udc80]"), 8)
        finally:
            os.dup2(old_stdout, STDOUT_FD)
            os.close(old_stdout)

        with open(filename, "r") as fp:
            self.assertEqual(fp.read(), "text[\\udc80]")

    def test_py_fopen(self):
        # Test Py_fopen() and Py_fclose()
        py_fopen = _testcapi.py_fopen

        with open(__file__, "rb") as fp:
            source = fp.read()

        for filename in (__file__, os.fsencode(__file__)):
            with self.subTest(filename=filename):
                data = py_fopen(filename, "rb")
                self.assertEqual(data, source[:256])

                data = py_fopen(os_helper.FakePath(filename), "rb")
                self.assertEqual(data, source[:256])

        filenames = [
            os_helper.TESTFN,
            os.fsencode(os_helper.TESTFN),
        ]
        if os_helper.TESTFN_UNDECODABLE is not None:
            filenames.append(os_helper.TESTFN_UNDECODABLE)
            filenames.append(os.fsdecode(os_helper.TESTFN_UNDECODABLE))
        if os_helper.TESTFN_UNENCODABLE is not None:
            filenames.append(os_helper.TESTFN_UNENCODABLE)
        for filename in filenames:
            with self.subTest(filename=filename):
                try:
                    with open(filename, "wb") as fp:
                        fp.write(source)
                except OSError:
                    # TESTFN_UNDECODABLE cannot be used to create a file
                    # on macOS/WASI.
                    filename = None
                    continue
                try:
                    data = py_fopen(filename, "rb")
                    self.assertEqual(data, source[:256])
                finally:
                    os_helper.unlink(filename)

        # embedded null character/byte in the filename
        with self.assertRaises(ValueError):
            py_fopen("a\x00b", "rb")
        with self.assertRaises(ValueError):
            py_fopen(b"a\x00b", "rb")

        # non-ASCII mode failing with "Invalid argument"
        with self.assertRaises(OSError):
            py_fopen(__file__, b"\xc2\x80")
        with self.assertRaises(OSError):
            # \x98 is invalid in cp1250, cp1251, cp1257
            # \x9d is invalid in cp1252-cp1255, cp1258
            py_fopen(__file__, b"\xc2\x98\xc2\x9d")
        # UnicodeDecodeError can come from the audit hook code
        with self.assertRaises((UnicodeDecodeError, OSError)):
            py_fopen(__file__, b"\x98\x9d")

        # invalid filename type
        for invalid_type in (123, object()):
            with self.subTest(filename=invalid_type):
                with self.assertRaises(TypeError):
                    py_fopen(invalid_type, "rb")

        if support.MS_WINDOWS:
            with self.assertRaises(OSError):
                # On Windows, the file mode is limited to 10 characters
                py_fopen(__file__, "rt+, ccs=UTF-8")

        # CRASHES py_fopen(NULL, 'rb')
        # CRASHES py_fopen(__file__, NULL)

    def test_py_universalnewlinefgets(self):
        py_universalnewlinefgets = _testcapi.py_universalnewlinefgets
        filename = os_helper.TESTFN
        self.addCleanup(os_helper.unlink, filename)

        with open(filename, "wb") as fp:
            fp.write(b"line1\nline2")

        line = py_universalnewlinefgets(filename, 1000)
        self.assertEqual(line, b"line1\n")

        with open(filename, "wb") as fp:
            fp.write(b"line2\r\nline3")

        line = py_universalnewlinefgets(filename, 1000)
        self.assertEqual(line, b"line2\n")

        with open(filename, "wb") as fp:
            fp.write(b"line3\rline4")

        line = py_universalnewlinefgets(filename, 1000)
        self.assertEqual(line, b"line3\n")

    # PyFile_SetOpenCodeHook() and PyFile_OpenCode() are tested by
    # test_embed.test_open_code_hook()


if __name__ == "__main__":
    unittest.main()
