1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
#
# Copyright (c) ZeroC, Inc. All rights reserved.
#
import sys, Ice, Test
def test(b):
if not b:
raise RuntimeError('test assertion failed')
def allTests(helper, communicator):
sys.stdout.write("testing stringToProxy... ")
sys.stdout.flush()
base = communicator.stringToProxy("test:{0}".format(helper.getTestEndpoint()))
test(base)
print("ok")
sys.stdout.write("testing checked cast... ")
sys.stdout.flush()
obj = Test.TestIntfPrx.checkedCast(base)
test(obj)
test(obj == base)
print("ok")
sys.stdout.write("creating/destroying/recreating object adapter... ")
sys.stdout.flush()
adapter = communicator.createObjectAdapterWithEndpoints("TransientTestAdapter", "default")
try:
communicator.createObjectAdapterWithEndpoints("TransientTestAdapter", "default")
test(False)
except Ice.LocalException:
pass
adapter.destroy()
adapter = communicator.createObjectAdapterWithEndpoints("TransientTestAdapter", "default")
adapter.destroy()
print("ok")
sys.stdout.write("creating/activating/deactivating object adapter in one operation... ")
sys.stdout.flush()
obj.transient()
print("ok")
sys.stdout.write("testing connection closure... ")
sys.stdout.flush()
for x in range(10):
initData = Ice.InitializationData()
initData.properties = communicator.getProperties().clone()
comm = Ice.initialize(initData)
comm.stringToProxy("test:{0}".format(helper.getTestEndpoint())).ice_pingAsync()
comm.destroy()
print("ok")
sys.stdout.write("testing object adapter published endpoints... ")
sys.stdout.flush()
communicator.getProperties().setProperty("PAdapter.PublishedEndpoints", "tcp -h localhost -p 12345 -t 30000")
adapter = communicator.createObjectAdapter("PAdapter")
test(len(adapter.getPublishedEndpoints()) == 1)
endpt = adapter.getPublishedEndpoints()[0];
test(str(endpt) == "tcp -h localhost -p 12345 -t 30000")
prx = communicator.stringToProxy("dummy:tcp -h localhost -p 12346 -t 20000:tcp -h localhost -p 12347 -t 10000")
adapter.setPublishedEndpoints(prx.ice_getEndpoints())
test(len(adapter.getPublishedEndpoints()) == 2)
ident = Ice.Identity()
ident.name = "dummy";
test(adapter.createProxy(ident).ice_getEndpoints() == prx.ice_getEndpoints())
test(adapter.getPublishedEndpoints() == prx.ice_getEndpoints())
adapter.refreshPublishedEndpoints()
test(len(adapter.getPublishedEndpoints()) == 1)
test(adapter.getPublishedEndpoints()[0] == endpt)
communicator.getProperties().setProperty("PAdapter.PublishedEndpoints", "tcp -h localhost -p 12345 -t 20000")
adapter.refreshPublishedEndpoints()
test(len(adapter.getPublishedEndpoints()) == 1)
test(str(adapter.getPublishedEndpoints()[0]) == "tcp -h localhost -p 12345 -t 20000")
adapter.destroy()
test(len(adapter.getPublishedEndpoints()) == 0)
print("ok")
if obj.ice_getConnection():
sys.stdout.write("testing object adapter with bi-dir connection... ")
sys.stdout.flush()
adapter = communicator.createObjectAdapter("")
obj.ice_getConnection().setAdapter(adapter)
obj.ice_getConnection().setAdapter(None)
adapter.deactivate()
try:
obj.ice_getConnection().setAdapter(adapter)
test(False)
except Ice.ObjectAdapterDeactivatedException:
pass
print("ok")
sys.stdout.write("testing object adapter with router... ")
sys.stdout.flush()
routerId = Ice.Identity()
routerId.name = "router";
router = Ice.RouterPrx.uncheckedCast(base.ice_identity(routerId).ice_connectionId("rc"))
adapter = communicator.createObjectAdapterWithRouter("", router)
test(len(adapter.getPublishedEndpoints()) == 1)
test(str(adapter.getPublishedEndpoints()[0]) == "tcp -h localhost -p 23456 -t 30000")
adapter.refreshPublishedEndpoints()
test(len(adapter.getPublishedEndpoints()) == 1)
test(str(adapter.getPublishedEndpoints()[0]) == "tcp -h localhost -p 23457 -t 30000")
try:
adapter.setPublishedEndpoints(router.ice_getEndpoints())
test(False)
except:
# Expected.
pass
adapter.destroy()
try:
routerId.name = "test";
router = Ice.RouterPrx.uncheckedCast(base.ice_identity(routerId))
communicator.createObjectAdapterWithRouter("", router)
test(False)
except Ice.OperationNotExistException:
# Expected: the "test" object doesn't implement Ice::Router!
pass
print("ok")
sys.stdout.write("deactivating object adapter in the server... ")
sys.stdout.flush()
obj.deactivate()
print("ok")
sys.stdout.write("testing whether server is gone... ")
sys.stdout.flush()
try:
obj.ice_timeout(100).ice_ping() # Use timeout to speed up testing on Windows
test(False)
except Ice.LocalException:
print("ok")
return obj
|