File: __init__.py

package info (click to toggle)
python-autobahn 22.7.1%2Bdfsg1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 8,404 kB
  • sloc: python: 38,356; javascript: 2,705; makefile: 905; ansic: 371; sh: 63
file content (293 lines) | stat: -rw-r--r-- 10,078 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) Crossbar.io Technologies GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################

# IHostnameResolver et al. were added in Twisted 17.1.0 .. before
# that, it was IResolverSimple only.

try:
    from twisted.internet.interfaces import IHostnameResolver
except ImportError:
    raise ImportError(
        "Twisted 17.1.0 or later required for autobahn.twisted.testing"
    )

from twisted.internet.defer import Deferred
from twisted.internet.address import IPv4Address
from twisted.internet._resolver import HostResolution  # "internal" class, but it's simple
from twisted.internet.interfaces import ISSLTransport, IReactorPluggableNameResolver
try:
    from twisted.internet.testing import MemoryReactorClock
except ImportError:
    from twisted.test.proto_helpers import MemoryReactorClock
from twisted.test import iosim

from zope.interface import directlyProvides, implementer

from autobahn.websocket.interfaces import IWebSocketClientAgent
from autobahn.twisted.websocket import _TwistedWebSocketClientAgent
from autobahn.twisted.websocket import WebSocketServerProtocol
from autobahn.twisted.websocket import WebSocketServerFactory


__all__ = (
    'create_pumper',
    'create_memory_agent',
    'MemoryReactorClockResolver',
)


@implementer(IHostnameResolver)
class _StaticTestResolver(object):
    def resolveHostName(self, receiver, hostName, portNumber=0):
        """
        Implement IHostnameResolver which always returns 127.0.0.1:31337
        """
        resolution = HostResolution(hostName)
        receiver.resolutionBegan(resolution)
        receiver.addressResolved(
            IPv4Address('TCP', '127.0.0.1', 31337 if portNumber == 0 else portNumber)
        )
        receiver.resolutionComplete()


@implementer(IReactorPluggableNameResolver)
class _TestNameResolver(object):
    """
    A test version of IReactorPluggableNameResolver
    """

    _resolver = None

    @property
    def nameResolver(self):
        if self._resolver is None:
            self._resolver = _StaticTestResolver()
        return self._resolver

    def installNameResolver(self, resolver):
        old = self._resolver
        self._resolver = resolver
        return old


class MemoryReactorClockResolver(MemoryReactorClock, _TestNameResolver):
    """
    Combine MemoryReactor, Clock and an IReactorPluggableNameResolver
    together.
    """
    pass


class _TwistedWebMemoryAgent(IWebSocketClientAgent):
    """
    A testing agent which will hook up an instance of
    `server_protocol` for every client that is created via the `open`
    API call.

    :param reactor: the reactor to use for tests (usually an instance
        of MemoryReactorClockResolver)

    :param pumper: an implementation IPumper (e.g. as returned by
        `create_pumper`)

    :param server_protocol: the server-side WebSocket protocol class
        to instantiate (e.g. a subclass of `WebSocketServerProtocol`
    """

    def __init__(self, reactor, pumper, server_protocol):
        self._reactor = reactor
        self._server_protocol = server_protocol
        self._pumper = pumper

        # our "real" underlying agent under test
        self._agent = _TwistedWebSocketClientAgent(self._reactor)
        self._pumps = set()
        self._servers = dict()  # client -> server

    def open(self, transport_config, options, protocol_class=None):
        """
        Implement IWebSocketClientAgent with in-memory transports.

        :param transport_config: a string starting with 'wss://' or
            'ws://'

        :param options: a dict containing options

        :param protocol_class: the client protocol class to
            instantiate (or `None` for defaults, which is to use
            `WebSocketClientProtocol`)
        """
        is_secure = transport_config.startswith("wss://")

        # call our "real" agent
        real_client_protocol = self._agent.open(
            transport_config, options,
            protocol_class=protocol_class,
        )

        if is_secure:
            host, port, factory, context_factory, timeout, bindAddress = self._reactor.sslClients[-1]
        else:
            host, port, factory, timeout, bindAddress = self._reactor.tcpClients[-1]

        server_address = IPv4Address('TCP', '127.0.0.1', port)
        client_address = IPv4Address('TCP', '127.0.0.1', 31337)

        server_protocol = self._server_protocol()
        # the protocol could already have a factory
        if getattr(server_protocol, "factory", None) is None:
            server_protocol.factory = WebSocketServerFactory()

        server_transport = iosim.FakeTransport(
            server_protocol, isServer=True,
            hostAddress=server_address, peerAddress=client_address)
        clientProtocol = factory.buildProtocol(None)
        client_transport = iosim.FakeTransport(
            clientProtocol, isServer=False,
            hostAddress=client_address, peerAddress=server_address)

        if is_secure:
            directlyProvides(server_transport, ISSLTransport)
            directlyProvides(client_transport, ISSLTransport)

        pump = iosim.connect(
            server_protocol, server_transport, clientProtocol, client_transport)
        self._pumper.add(pump)

        def add_mapping(proto):
            self._servers[proto] = server_protocol
            return proto
        real_client_protocol.addCallback(add_mapping)
        return real_client_protocol


class _Kalamazoo(object):
    """
    Feeling whimsical about class names, see https://en.wikipedia.org/wiki/Handcar

    This is 'an IOPump pumper', an object which causes a series of
    IOPumps it is monitoring to do their I/O operations
    periodically. This needs the 'real' reactor which trial drives,
    because reasons:

     - so @inlineCallbacks / async-def functions work
       (if I could explain exactly why here, I would)

     - we need to 'break the loop' of synchronous calls somewhere and
       polluting the tests themselves with that is bad

     - get rid of e.g. .flush() calls in tests themselves (thus
      'teaching' the tests about details of I/O scheduling that they
      shouldn't know).
    """

    def __init__(self):
        self._pumps = set()
        self._pumping = False
        self._waiting_for_stop = []
        from twisted.internet import reactor as global_reactor
        self._global_reactor = global_reactor

    def add(self, p):
        """
        Add a new IOPump. It will be removed when both its client and
        server are disconnected.
        """
        self._pumps.add(p)

    def start(self):
        """
        Begin triggering I/O in all IOPump instances we have. We will keep
        periodically 'pumping' our IOPumps until `.stop()` is
        called. Call from `setUp()` for example.
        """
        if self._pumping:
            return
        self._pumping = True
        self._global_reactor.callLater(0, self._pump_once)

    def stop(self):
        """
        :returns: a Deferred that fires when we have stopped pump()-ing

        Call from `tearDown()`, for example.
        """
        if self._pumping or len(self._waiting_for_stop):
            d = Deferred()
            self._waiting_for_stop.append(d)
            self._pumping = False
            return d
        d = Deferred()
        d.callback(None)
        return d

    def _pump_once(self):
        """
        flush all data from all our IOPump instances and schedule another
        iteration on the global reactor
        """
        if self._pumping:
            self._flush()
            self._global_reactor.callLater(0.1, self._pump_once)
        else:
            for d in self._waiting_for_stop:
                d.callback(None)
            self._waiting_for_stop = []

    def _flush(self):
        """
        Flush all data between pending client/server pairs.
        """
        old_pumps = self._pumps
        new_pumps = self._pumps = set()
        for p in old_pumps:
            p.flush()
            if p.clientIO.disconnected and p.serverIO.disconnected:
                continue
            new_pumps.add(p)


def create_pumper():
    """
    return a new instance implementing IPumper
    """
    return _Kalamazoo()


def create_memory_agent(reactor, pumper, server_protocol):
    """
    return a new instance implementing `IWebSocketClientAgent`.

    connection attempts will be satisfied by traversing the Upgrade
    request path starting at `resource` to find a `WebSocketResource`
    and then exchange data between client and server using purely
    in-memory buffers.
    """
    # Note, we currently don't actually do any "resource traversing"
    # and basically accept any path at all to our websocket resource
    if server_protocol is None:
        server_protocol = WebSocketServerProtocol
    return _TwistedWebMemoryAgent(reactor, pumper, server_protocol)