from __future__ import print_function

import gdbremote_testcase
import random
import select
import socket
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
from lldbgdbserverutils import Server
import lldbsuite.test.lldbplatformutil

if lldbplatformutil.getHostPlatform() == "windows":
    import ctypes
    import ctypes.wintypes
    from ctypes.wintypes import (BOOL, DWORD, HANDLE, LPCWSTR, LPDWORD, LPVOID)

    kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)

    PIPE_ACCESS_INBOUND = 1
    FILE_FLAG_FIRST_PIPE_INSTANCE = 0x00080000
    FILE_FLAG_OVERLAPPED = 0x40000000
    PIPE_TYPE_BYTE = 0
    PIPE_REJECT_REMOTE_CLIENTS = 8
    INVALID_HANDLE_VALUE = -1
    ERROR_ACCESS_DENIED = 5
    ERROR_IO_PENDING = 997


    class OVERLAPPED(ctypes.Structure):
        _fields_ = [("Internal", LPVOID), ("InternalHigh", LPVOID), ("Offset",
            DWORD), ("OffsetHigh", DWORD), ("hEvent", HANDLE)]

        def __init__(self):
            super(OVERLAPPED, self).__init__(Internal=0, InternalHigh=0,
                Offset=0, OffsetHigh=0, hEvent=None)
    LPOVERLAPPED = ctypes.POINTER(OVERLAPPED)

    CreateNamedPipe = kernel32.CreateNamedPipeW
    CreateNamedPipe.restype = HANDLE
    CreateNamedPipe.argtypes = (LPCWSTR, DWORD, DWORD, DWORD, DWORD, DWORD,
            DWORD, LPVOID)

    ConnectNamedPipe = kernel32.ConnectNamedPipe
    ConnectNamedPipe.restype = BOOL
    ConnectNamedPipe.argtypes = (HANDLE, LPOVERLAPPED)

    CreateEvent = kernel32.CreateEventW
    CreateEvent.restype = HANDLE
    CreateEvent.argtypes = (LPVOID, BOOL, BOOL, LPCWSTR)

    GetOverlappedResultEx = kernel32.GetOverlappedResultEx
    GetOverlappedResultEx.restype = BOOL
    GetOverlappedResultEx.argtypes = (HANDLE, LPOVERLAPPED, LPDWORD, DWORD,
        BOOL)

    ReadFile = kernel32.ReadFile
    ReadFile.restype = BOOL
    ReadFile.argtypes = (HANDLE, LPVOID, DWORD, LPDWORD, LPOVERLAPPED)

    CloseHandle = kernel32.CloseHandle
    CloseHandle.restype = BOOL
    CloseHandle.argtypes = (HANDLE,)

    class Pipe(object):
        def __init__(self, prefix):
            while True:
                self.name = "lldb-" + str(random.randrange(1e10))
                full_name = "\\\\.\\pipe\\" + self.name
                self._handle = CreateNamedPipe(full_name, PIPE_ACCESS_INBOUND |
                        FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
                        PIPE_TYPE_BYTE | PIPE_REJECT_REMOTE_CLIENTS, 1, 4096,
                        4096, 0, None)
                if self._handle != INVALID_HANDLE_VALUE:
                    break
                if ctypes.get_last_error() != ERROR_ACCESS_DENIED:
                    raise ctypes.WinError(ctypes.get_last_error())

            self._overlapped = OVERLAPPED()
            self._overlapped.hEvent = CreateEvent(None, True, False, None)
            result = ConnectNamedPipe(self._handle, self._overlapped)
            assert result == 0
            if ctypes.get_last_error() != ERROR_IO_PENDING:
                raise ctypes.WinError(ctypes.get_last_error())

        def finish_connection(self, timeout):
            if not GetOverlappedResultEx(self._handle, self._overlapped,
                    ctypes.byref(DWORD(0)), timeout*1000, True):
                raise ctypes.WinError(ctypes.get_last_error())

        def read(self, size, timeout):
            buf = ctypes.create_string_buffer(size)
            if not ReadFile(self._handle, ctypes.byref(buf), size, None,
                    self._overlapped):
                if ctypes.get_last_error() != ERROR_IO_PENDING:
                    raise ctypes.WinError(ctypes.get_last_error())
            read = DWORD(0)
            if not GetOverlappedResultEx(self._handle, self._overlapped,
                    ctypes.byref(read), timeout*1000, True):
                raise ctypes.WinError(ctypes.get_last_error())
            return buf.raw[0:read.value]

        def close(self):
            CloseHandle(self._overlapped.hEvent)
            CloseHandle(self._handle)


else:
    class Pipe(object):
        def __init__(self, prefix):
            self.name = os.path.join(prefix, "stub_port_number")
            os.mkfifo(self.name)
            self._fd = os.open(self.name, os.O_RDONLY | os.O_NONBLOCK)

        def finish_connection(self, timeout):
            pass

        def read(self, size, timeout):
            (readers, _, _) = select.select([self._fd], [], [], timeout)
            if self._fd not in readers:
                raise TimeoutError
            return os.read(self._fd, size)

        def close(self):
            os.close(self._fd)


class TestGdbRemoteConnection(gdbremote_testcase.GdbRemoteTestCaseBase):

    @skipIfRemote  # reverse connect is not a supported use case for now
    def test_reverse_connect(self):
        # Reverse connect is the default connection method.
        self.connect_to_debug_monitor()
        # Verify we can do the handshake.  If that works, we'll call it good.
        self.do_handshake()

    @skipIfRemote
    def test_named_pipe(self):
        family, type, proto, _, addr = socket.getaddrinfo(
            self.stub_hostname, 0, proto=socket.IPPROTO_TCP)[0]
        self.sock = socket.socket(family, type, proto)
        self.sock.settimeout(self.DEFAULT_TIMEOUT)

        self.addTearDownHook(lambda: self.sock.close())

        pipe = Pipe(self.getBuildDir())

        self.addTearDownHook(lambda: pipe.close())

        args = self.debug_monitor_extra_args
        if lldb.remote_platform:
            args += ["*:0"]
        else:
            args += ["localhost:0"]

        args += ["--named-pipe", pipe.name]

        server = self.spawnSubprocess(
            self.debug_monitor_exe,
            args,
            install_remote=False)

        pipe.finish_connection(self.DEFAULT_TIMEOUT)
        port = pipe.read(10, self.DEFAULT_TIMEOUT)
        # Trim null byte, convert to int
        addr = (addr[0], int(port[:-1]))
        self.sock.connect(addr)
        self._server = Server(self.sock, server)

        # Verify we can do the handshake.  If that works, we'll call it good.
        self.do_handshake()
