1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
#!/usr/bin/env python
# **********************************************************************
#
# Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
#
# This copy of Ice is licensed to you under the terms described in the
# ICE_LICENSE file included in this distribution.
#
# **********************************************************************
import os, sys, traceback
import Ice
Ice.loadSlice('Test.ice')
import Test
import RouterI
def test(b):
if not b:
raise RuntimeError('test assertion failed')
def run(args, communicator, sync):
hello = Test.HelloPrx.checkedCast(communicator.stringToProxy("test:default -p 12010 -t 10000"))
hello.sayHello(False)
hello.sayHello(False, { "_fwd":"o" } )
test(hello.add(10, 20) == 30)
try:
hello.raiseUE()
test(False)
except Test.UE:
pass
try:
Test.HelloPrx.checkedCast(communicator.stringToProxy("unknown:default -p 12010 -t 10000"))
test(False)
except Ice.ObjectNotExistException:
pass
# First try an object at a non-existent endpoint.
try:
Test.HelloPrx.checkedCast(communicator.stringToProxy("missing:default -p 12000 -t 10000"))
test(False)
except Ice.UnknownLocalException, e:
test(e.unknown.find('ConnectionRefusedException'))
if sync:
hello.shutdown()
return True
argv = sys.argv[:] # Clone the arguments to use again later
try:
initData = Ice.InitializationData()
initData.properties = Ice.createProperties(argv)
initData.properties.setProperty('Ice.Warn.Dispatch', '0')
communicator = Ice.initialize(argv, initData)
router = RouterI.RouterI(communicator, False)
print "testing async blobject...",
sys.stdout.flush()
status = run(sys.argv, communicator, False)
print "ok"
router.destroy()
except:
traceback.print_exc()
status = False
if communicator:
try:
communicator.destroy()
except:
traceback.print_exc()
status = False
if status:
try:
initData = Ice.InitializationData()
initData.properties = Ice.createProperties(sys.argv)
initData.properties.setProperty('Ice.Warn.Dispatch', '0')
communicator = Ice.initialize(sys.argv, initData)
router = RouterI.RouterI(communicator, True)
print "testing sync blobject...",
sys.stdout.flush()
status = run(sys.argv, communicator, True)
print "ok"
router.destroy()
except:
traceback.print_exc()
status = False
if communicator:
try:
communicator.destroy()
except:
traceback.print_exc()
status = False
sys.exit(not status)
|