1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
from autobahn.twisted.component import Component, run
from autobahn.wamp.types import RegisterOptions
from autobahn.wamp.exception import ApplicationError
from twisted.internet.defer import inlineCallbacks
@inlineCallbacks
def main(reactor, session):
print("Client session={}".format(session))
try:
res = yield session.register(lambda: None, "com.foo.private")
print("\n\nregistering 'com.foo.private' should have failed\n\n")
except ApplicationError as e:
print("registering 'com.foo.private' failed as expected: {}".format(e.error))
res = yield session.register(
lambda: None, "should.work",
options=RegisterOptions(match='exact'),
)
print("registered 'should.work' with id {}".format(res.id))
try:
res = yield session.register(
lambda: None, "prefix.fail.",
options=RegisterOptions(match='prefix'),
)
print("\n\nshould have failed\n\n")
except ApplicationError as e:
print("prefix-match 'prefix.fail.' failed as expected: {}".format(e.error))
print("calling 'example.foo'")
try:
res = yield session.call("example.foo")
except Exception as e:
# to see errors uncomment the "throw" in backend .. also try
# with .traceback_app enabled (in the backend)
print("Error from 'example.foo' call:")
print(e)
print("example.foo() = {}".format(res))
print("done")
component = Component(
transports="ws://localhost:8080/auth_ws",
main=main,
realm="crossbardemo",
authentication={
"wampcra": {
"authid": "bob",
"authrole": "dynamic_authed",
"secret": "p4ssw0rd",
}
}
)
if __name__ == "__main__":
run([component])
|