File: runner.py

package info (click to toggle)
python-autobahn 17.10.1%2Bdfsg1-7
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 4,452 kB
  • sloc: python: 22,598; javascript: 2,705; makefile: 497; sh: 3
file content (112 lines) | stat: -rw-r--r-- 4,014 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import six
import asyncio
import signal
from six.moves.urllib.parse import urlparse
from autobahn.wamp.types import ComponentConfig
from autobahn.asyncio.rawsocket import WampRawSocketClientFactory
import txaio


# TODO - unify with previous class
class ApplicationRunnerRawSocket(object):
    """
    This class is a convenience tool mainly for development and quick hosting
    of WAMP application components.

    It can host a WAMP application component in a WAMP-over-WebSocket client
    connecting to a WAMP router.
    """
    log=txaio.make_logger()
    def __init__(self, url, realm, extra=None, serializer=None):
        """
        :param url: Raw socket unicode - either path on local server to unix socket (or unix:/path)
             or tcp://host:port for internet socket
        :type url: unicode

        :param realm: The WAMP realm to join the application session to.
        :type realm: unicode

        :param extra: Optional extra configuration to forward to the application component.
        :type extra: dict

        :param serializer:  WAMP serializer to use (or None for default serializer).
        :type serializer: `autobahn.wamp.interfaces.ISerializer`
        """
        assert(type(url) == six.text_type)
        assert(type(realm) == six.text_type)
        assert(extra is None or type(extra) == dict)
        self.url = url
        self.realm = realm
        self.extra = extra or dict()
        self.serializer = serializer

    def run(self, make, logging_level='info'):
        """
        Run the application component.

        :param make: A factory that produces instances of :class:`autobahn.asyncio.wamp.ApplicationSession`
           when called with an instance of :class:`autobahn.wamp.types.ComponentConfig`.
        :type make: callable
        """

        def create():
            cfg = ComponentConfig(self.realm, self.extra)
            try:
                session = make(cfg)
            except Exception:
                self.log.failure("App session could not be created! ")
                asyncio.get_event_loop().stop()
            else:
                return session

        parsed_url = urlparse(self.url)

        if parsed_url.scheme == 'tcp':
            is_unix = False
            if not parsed_url.hostname or not parsed_url.port:
                raise ValueError('Host and port is required in URL')
        elif parsed_url.scheme == 'unix' or parsed_url.scheme == '':
            is_unix = True
            if not parsed_url.path:
                raise ValueError('Path to unix socket must be in URL')

        transport_factory = WampRawSocketClientFactory(create, serializer=self.serializer)

        loop = asyncio.get_event_loop()
        if logging_level == 'debug':
            loop.set_debug(True)
        txaio.use_asyncio()
        txaio.config.loop = loop

        try:
            loop.add_signal_handler(signal.SIGTERM, loop.stop)
        except NotImplementedError:
            # signals are not available on Windows
            pass

        def handle_error(loop, context):
            self.log.error('Application Error: {err}', err=context)
            loop.stop()

        loop.set_exception_handler(handle_error)

        if is_unix:
            coro = loop.create_unix_connection(transport_factory, parsed_url.path)
        else:
            coro = loop.create_connection(transport_factory, parsed_url.hostname, parsed_url.port)
        (_transport, protocol) = loop.run_until_complete(coro)

        txaio.start_logging(level=logging_level)  # @UndefinedVariable

        try:
            loop.run_forever()
        except KeyboardInterrupt:
            pass
        self.log.debug('Left main loop waiting for completion')
        # give Goodbye message a chance to go through, if we still
        # have an active session
        # it's not working now - because protocol is_closed must return Future
        if protocol._session:
            loop.run_until_complete(protocol._session.leave())

        loop.close()