#
# Copyright (c) ZeroC, Inc. All rights reserved.
#

import Ice, Test, sys, threading, time, traceback

def test(b):
    if not b:
        raise RuntimeError('test assertion failed')

class LoggerI(Ice.Logger):
    def __init__(self):
        self._started = False
        self._messages = []
        self.m = threading.Lock()

    def start(self):
        with self.m:
            self._started = True
            self.dump()

    def _print(self, msg):
        with self.m:
            self._messages.append(msg)
            if self._started:
                self.dump()

    def trace(self, category, msg):
        with self.m:
            self._messages.append("[" + category + "] " + msg)
            if self._started:
                self.dump()

    def warning(self, msg):
        with self.m:
            self._messages.append("warning: " + msg)
            if self._started:
                self.dump()

    def error(self, msg):
        with self.m:
            self._messages.append("error: " + msg)
            if self._started:
                self.dump()

    def getPrefix(self):
        return ""

    def cloneWithPrefix(self, prefix):
        return self

    def dump(self):
        for p in self._messages:
            print(p)
        self._messages = []

class TestCase(threading.Thread):
    def __init__(self, name, com):
        threading.Thread.__init__(self)
        self._name = name
        self._com = com
        self._logger = LoggerI()
        self._clientACMTimeout = -1
        self._clientACMClose = -1
        self._clientACMHeartbeat = -1
        self._serverACMTimeout = -1
        self._serverACMClose = -1
        self._serverACMHeartbeat = -1
        self._heartbeat = 0
        self._closed = False
        self._msg = ""
        self.m = threading.Condition()

    def init(self):
        self._adapter = \
            self._com.createObjectAdapter(self._serverACMTimeout, self._serverACMClose, self._serverACMHeartbeat)

        initData = Ice.InitializationData()
        initData.properties = self._com.ice_getCommunicator().getProperties().clone()
        initData.logger = self._logger
        initData.properties.setProperty("Ice.ACM.Timeout", "2")
        if self._clientACMTimeout >= 0:
            initData.properties.setProperty("Ice.ACM.Client.Timeout", str(self._clientACMTimeout))
        if self._clientACMClose >= 0:
            initData.properties.setProperty("Ice.ACM.Client.Close", str(self._clientACMClose))
        if self._clientACMHeartbeat >= 0:
            initData.properties.setProperty("Ice.ACM.Client.Heartbeat", str(self._clientACMHeartbeat))
        #initData.properties.setProperty("Ice.Trace.Protocol", "2")
        #initData.properties.setProperty("Ice.Trace.Network", "2")
        self._communicator = Ice.initialize(initData)

    def destroy(self):
        self._adapter.deactivate()
        self._communicator.destroy()

    def joinWithThread(self):
        sys.stdout.write("testing " + self._name + "... ")
        sys.stdout.flush()
        self._logger.start()
        self.join()
        if len(self._msg) == 0:
            print("ok")
        else:
            print("failed!\n" + self._msg)
            test(False)

    def run(self):
        proxy = Test.TestIntfPrx.uncheckedCast(self._communicator.stringToProxy(
                    self._adapter.getTestIntf().ice_toString()))
        try:
            proxy.ice_getConnection().setCloseCallback(lambda conn: self.closed(conn))
            proxy.ice_getConnection().setHeartbeatCallback(lambda conn: self.heartbeat(conn))

            self.runTestCase(self._adapter, proxy)
        except Exception as ex:
            self._msg = "unexpected exception:\n" + traceback.format_exc()

    def heartbeat(self, con):
        with self.m:
            self._heartbeat = self._heartbeat + 1

    def closed(self, con):
        with self.m:
            self._closed = True
            self.m.notify()

    def waitForClosed(self):
        with self.m:
            while not self._closed:
                now = time.time()
                self.m.wait(30.0) # Wait 30s
                if time.time() - now > 30.0:
                    test(False)

    def runTestCase(self, adapter, proxy):
        test(False)

    def setClientACM(self, timeout, close, heartbeat):
        self._clientACMTimeout = timeout
        self._clientACMClose = close
        self._clientACMHeartbeat = heartbeat

    def setServerACM(self, timeout, close, heartbeat):
        self._serverACMTimeout = timeout
        self._serverACMClose = close
        self._serverACMHeartbeat = heartbeat

def allTests(helper, communicator):
    ref = "communicator:{0}".format(helper.getTestEndpoint(num=0))
    com = Test.RemoteCommunicatorPrx.uncheckedCast(communicator.stringToProxy(ref))

    tests = []

    class InvocationHeartbeatTest(TestCase):
        def __init__(self, com):
            TestCase.__init__(self, "invocation heartbeat", com)
            self.setServerACM(1, -1, -1) # Faster ACM to make sure we receive enough ACM heartbeats

        def runTestCase(self, adapter, proxy):
            proxy.sleep(4)

            with self.m:
                test(self._heartbeat >= 4)

    class InvocationHeartbeatOnHoldTest(TestCase):
        def __init__(self, com):
            TestCase.__init__(self, "invocation with heartbeat on hold", com)
            # Use default ACM configuration.

        def runTestCase(self, adapter, proxy):
            try:
                # When the OA is put on hold, connections shouldn't
                # send heartbeats, the invocation should therefore
                # fail.
                proxy.sleepAndHold(10)
                test(False)
            except Ice.ConnectionTimeoutException:
                adapter.activate()
                proxy.interruptSleep()
                self.waitForClosed()

    class InvocationNoHeartbeatTest(TestCase):
        def __init__(self, com):
            TestCase.__init__(self, "invocation with no heartbeat", com)
            self.setServerACM(2, 2, 0) # Disable heartbeat on invocations

        def runTestCase(self, adapter, proxy):
            try:
                # Heartbeats are disabled on the server, the
                # invocation should fail since heartbeats are
                # expected.
                proxy.sleep(10)
                test(False)
            except Ice.ConnectionTimeoutException:
                proxy.interruptSleep()
                self.waitForClosed()

                with self.m:
                    test(self._heartbeat == 0)

    class InvocationHeartbeatCloseOnIdleTest(TestCase):
        def __init__(self, com):
            TestCase.__init__(self, "invocation with no heartbeat and close on idle", com)
            self.setClientACM(1, 1, 0) # Only close on idle.
            self.setServerACM(1, 2, 0) # Disable heartbeat on invocations

        def runTestCase(self, adapter, proxy):
            # No close on invocation, the call should succeed this time.
            proxy.sleep(3)

            with self.m:
                test(self._heartbeat == 0)
                test(not self._closed)

    class CloseOnIdleTest(TestCase):
        def __init__(self, com):
            TestCase.__init__(self, "close on idle", com)
            self.setClientACM(1, 1, 0) # Only close on idle.

        def runTestCase(self, adapter, proxy):
            self.waitForClosed()

            with self.m:
                test(self._heartbeat == 0)

    class CloseOnInvocationTest(TestCase):
        def __init__(self, com):
            TestCase.__init__(self, "close on invocation", com)
            self.setClientACM(1, 2, 0) # Only close on invocation.

        def runTestCase(self, adapter, proxy):
            time.sleep(3) # Idle for 3 seconds

            with self.m:
                test(self._heartbeat == 0)
                test(not self._closed)

    class CloseOnIdleAndInvocationTest(TestCase):
        def __init__(self, com):
            TestCase.__init__(self, "close on idle and invocation", com)
            self.setClientACM(3, 3, 0) # Only close on idle and invocation.

        def runTestCase(self, adapter, proxy):
            #
            # Put the adapter on hold. The server will not respond to
            # the graceful close. This allows to test whether or not
            # the close is graceful or forceful.
            #
            adapter.hold()
            time.sleep(5) # Idle for 5 seconds

            with self.m:
                test(self._heartbeat == 0)
                test(not self._closed) # Not closed yet because of graceful close.

            adapter.activate()
            self.waitForClosed()

    class ForcefulCloseOnIdleAndInvocationTest(TestCase):
        def __init__(self, com):
            TestCase.__init__(self, "forceful close on idle and invocation", com)
            self.setClientACM(1, 4, 0) # Only close on idle and invocation.

        def runTestCase(self, adapter, proxy):
            adapter.hold()
            self.waitForClosed()

            with self.m:
                test(self._heartbeat == 0)

    class HeartbeatOnIdleTest(TestCase):
        def __init__(self, com):
            TestCase.__init__(self, "heartbeat on idle", com)
            self.setServerACM(1, -1, 2) # Enable server heartbeats.

        def runTestCase(self, adapter, proxy):
            time.sleep(3)

            with self.m:
                test(self._heartbeat >= 3)

    class HeartbeatAlwaysTest(TestCase):
        def __init__(self, com):
            TestCase.__init__(self, "heartbeat always", com)
            self.setServerACM(1, -1, 3) # Enable server heartbeats.

        def runTestCase(self, adapter, proxy):
            for i in range(0, 10):
                proxy.ice_ping()
                time.sleep(0.3)

            with self.m:
                test(self._heartbeat >= 3)

    class HeartbeatManualTest(TestCase):
        def __init__(self, com):
            TestCase.__init__(self, "manual heartbeats", com)
            #
            # Disable heartbeats.
            #
            self.setClientACM(10, -1, 0)
            self.setServerACM(10, -1, 0)

        def runTestCase(self, adapter, proxy):
            proxy.startHeartbeatCount()
            con = proxy.ice_getConnection()
            con.heartbeat()
            con.heartbeat()
            con.heartbeat()
            con.heartbeat()
            con.heartbeat()
            proxy.waitForHeartbeatCount(5)

    class SetACMTest(TestCase):
        def __init__(self, com):
            TestCase.__init__(self, "setACM/getACM", com)
            self.setClientACM(15, 4, 0)

        def runTestCase(self, adapter, proxy):
            try:
                proxy.ice_getCachedConnection().setACM(-19, Ice.Unset, Ice.Unset)
                test(False)
            except RuntimeError:
                pass

            acm = proxy.ice_getCachedConnection().getACM()
            test(acm.timeout == 15)
            test(acm.close == Ice.ACMClose.CloseOnIdleForceful)
            test(acm.heartbeat == Ice.ACMHeartbeat.HeartbeatOff)

            proxy.ice_getCachedConnection().setACM(Ice.Unset, Ice.Unset, Ice.Unset)
            acm = proxy.ice_getCachedConnection().getACM()
            test(acm.timeout == 15)
            test(acm.close == Ice.ACMClose.CloseOnIdleForceful)
            test(acm.heartbeat == Ice.ACMHeartbeat.HeartbeatOff)

            proxy.ice_getCachedConnection().setACM(1, Ice.ACMClose.CloseOnInvocationAndIdle,
                                                   Ice.ACMHeartbeat.HeartbeatAlways)
            acm = proxy.ice_getCachedConnection().getACM()
            test(acm.timeout == 1)
            test(acm.close == Ice.ACMClose.CloseOnInvocationAndIdle)
            test(acm.heartbeat == Ice.ACMHeartbeat.HeartbeatAlways)

            proxy.startHeartbeatCount()
            proxy.waitForHeartbeatCount(2)

    tests.append(InvocationHeartbeatTest(com))
    tests.append(InvocationHeartbeatOnHoldTest(com))
    tests.append(InvocationNoHeartbeatTest(com))
    tests.append(InvocationHeartbeatCloseOnIdleTest(com))

    tests.append(CloseOnIdleTest(com))
    tests.append(CloseOnInvocationTest(com))
    tests.append(CloseOnIdleAndInvocationTest(com))
    tests.append(ForcefulCloseOnIdleAndInvocationTest(com))

    tests.append(HeartbeatOnIdleTest(com))
    tests.append(HeartbeatAlwaysTest(com))
    tests.append(HeartbeatManualTest(com))
    tests.append(SetACMTest(com))

    for p in tests:
        p.init()
    for p in tests:
        p.start()
    for p in tests:
        p.joinWithThread()
    for p in tests:
        p.destroy()

    sys.stdout.write("shutting down... ")
    sys.stdout.flush()
    com.shutdown()
    print("ok")
