1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
|
#
# Copyright (c) ZeroC, Inc. All rights reserved.
#
import Ice, Test, Dispatcher, sys, threading, random
def test(b):
if not b:
raise RuntimeError('test assertion failed')
class Callback:
def __init__(self):
self._called = False
self._cond = threading.Condition()
self._mainThread = threading.current_thread()
def check(self):
with self._cond:
while not self._called:
self._cond.wait()
self._called = False
def called(self):
with self._cond:
self._called = True
self._cond.notify()
def response(self, f):
test(f.exception() is None)
test(Dispatcher.Dispatcher.isDispatcherThread())
self.called()
def exception(self, f):
test(isinstance(f.exception(), Ice.NoEndpointException))
test(Dispatcher.Dispatcher.isDispatcherThread())
self.called()
def exceptionEx(self, f):
test(isinstance(f.exception(), Ice.InvocationTimeoutException))
test(Dispatcher.Dispatcher.isDispatcherThread())
self.called()
def payload(self, f):
if f.exception():
test(isinstance(f.exception(), Ice.CommunicatorDestroyedException))
else:
test(Dispatcher.Dispatcher.isDispatcherThread())
def allTests(helper, communicator):
sref = "test:{0}".format(helper.getTestEndpoint())
obj = communicator.stringToProxy(sref)
test(obj)
p = Test.TestIntfPrx.uncheckedCast(obj)
sref = "testController:{0}".format(helper.getTestEndpoint(num=1))
obj = communicator.stringToProxy(sref)
test(obj)
testController = Test.TestIntfControllerPrx.uncheckedCast(obj)
sys.stdout.write("testing dispatcher... ")
sys.stdout.flush()
p.op()
cb = Callback()
p.opAsync().add_done_callback_async(cb.response)
cb.check()
#
# Expect NoEndpointException.
#
i = p.ice_adapterId("dummy")
i.opAsync().add_done_callback_async(cb.exception)
cb.check()
#
# Expect InvocationTimeoutException.
#
to = p.ice_invocationTimeout(10);
to.sleepAsync(500).add_done_callback_async(cb.exceptionEx)
cb.check()
#
# Hold adapter to make sure invocations don't _complete_ synchronously
#
testController.holdAdapter()
if sys.version_info[0] == 2:
b = [chr(random.randint(0, 255)) for x in range(0, 1024)]
seq = ''.join(b)
else:
b = [random.randint(0, 255) for x in range(0, 1024)]
seq = bytes(b)
f = None
while True:
f = p.opWithPayloadAsync(seq)
f.add_done_callback(cb.payload)
if not f.is_sent_synchronously():
break
testController.resumeAdapter()
f.result()
print("ok")
p.shutdown()
|