File: http_server_integration.py

package info (click to toggle)
amqtt 0.11.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,660 kB
  • sloc: python: 14,565; sh: 42; makefile: 34; javascript: 27
file content (184 lines) | stat: -rw-r--r-- 6,046 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import asyncio
import io
import logging
import ssl

import aiohttp
from aiohttp import web

from amqtt.adapters import ReaderAdapter, WriterAdapter
from amqtt.broker import Broker
from amqtt.contexts import BrokerConfig, ListenerConfig, ListenerType

logger = logging.getLogger(__name__)

MQTT_LISTENER_NAME = "myMqttListener"

async def hello(request):
    """Get request handler"""
    return web.Response(text="Hello, world")

class WebSocketResponseReader(ReaderAdapter):
    """Interface to allow mqtt broker to read from an aiohttp websocket connection."""

    def __init__(self, ws: web.WebSocketResponse):
        self.ws = ws
        self.buffer = bytearray()

    async def read(self, n: int = -1) -> bytes:
        """Read 'n' bytes from the datastream, if < 0 read all available bytes

        Raises:
            BrokerPipeError : if reading on a closed websocket connection

        """
        # continue until buffer contains at least the amount of data being requested
        while not self.buffer or len(self.buffer) < n:
            # if the websocket is closed
            if self.ws.closed:
                raise BrokenPipeError

            try:
                # read from stream
                msg = await asyncio.wait_for(self.ws.receive(), timeout=0.5)
                # mqtt streams should always be binary...
                if msg.type == aiohttp.WSMsgType.BINARY:
                    self.buffer.extend(msg.data)
                elif msg.type == aiohttp.WSMsgType.CLOSE:
                    raise BrokenPipeError

            except asyncio.TimeoutError:
                raise BrokenPipeError

        # return all bytes currently in the buffer
        if n == -1:
            result = bytes(self.buffer)
            self.buffer.clear()
        # return the requested number of bytes from the buffer
        else:
            result = self.buffer[:n]
            del self.buffer[:n]

        return result

    def feed_eof(self) -> None:
        pass

class WebSocketResponseWriter(WriterAdapter):
    """Interface to allow mqtt broker to write to an aiohttp websocket connection."""

    def __init__(self, ws: web.WebSocketResponse, request: web.Request):
        super().__init__()
        self.ws = ws

        # needed for `get_peer_info`
        # https://docs.python.org/3/library/socket.html#socket.socket.getpeername
        peer_name = request.transport.get_extra_info("peername")
        if peer_name is not None:
            self.client_ip, self.port = peer_name[0:2]
        else:
            self.client_ip, self.port = request.remote, 0

        # interpret AF_INET6
        self.client_ip = "localhost" if self.client_ip == "::1" else self.client_ip

        self._stream = io.BytesIO(b"")

    def write(self, data: bytes) -> None:
        """Add bytes to stream buffer."""
        self._stream.write(data)

    async def drain(self) -> None:
        """Send the collected bytes in the buffer to the websocket connection."""
        data = self._stream.getvalue()
        if data and len(data):
            await self.ws.send_bytes(data)
        self._stream = io.BytesIO(b"")

    def get_peer_info(self) -> tuple[str, int] | None:
        return self.client_ip, self.port

    async def close(self) -> None:
        # no clean up needed, stream will be gc along with instance
        pass

    def get_ssl_info(self) -> ssl.SSLObject | None:
        pass


async def mqtt_websocket_handler(request: web.Request) -> web.StreamResponse:

    # establish connection by responding to the websocket request with the 'mqtt' protocol
    ws = web.WebSocketResponse(protocols=["mqtt"])
    await ws.prepare(request)

    # access the broker created when the server started
    b: Broker = request.app["broker"]

    # hand-off the websocket data stream to the broker for handling
    # `listener_name` is the same name of the externalized listener in the broker config
    await b.external_connected(WebSocketResponseReader(ws), WebSocketResponseWriter(ws, request), MQTT_LISTENER_NAME)

    logger.debug("websocket connection closed")
    return ws


async def websocket_handler(request: web.Request) -> web.StreamResponse:
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        logging.info(msg)

    logging.info("websocket connection closed")
    return ws

def main():
    # create an `aiohttp` server
    lp = asyncio.get_event_loop()
    app = web.Application()
    app.add_routes(
        [
            web.get("/", hello), # http get request/response route
            web.get("/ws", websocket_handler), # standard websocket handler
            web.get("/mqtt", mqtt_websocket_handler), # websocket handler for mqtt connections
        ])
    # create background task for running the `amqtt` broker
    app.cleanup_ctx.append(run_broker)

    # make sure that both `aiohttp` server and `amqtt` broker run in the same loop
    #  so the server can hand off the connection to the broker (prevents attached-to-a-different-loop `RuntimeError`)
    web.run_app(app, loop=lp)


async def run_broker(_app):
    """App init function to start (and then shutdown) the `amqtt` broker.
    https://docs.aiohttp.org/en/stable/web_advanced.html#background-tasks
    """
    # standard TCP connection as well as an externalized-listener
    cfg = BrokerConfig(
        listeners={
            "default":ListenerConfig(type=ListenerType.TCP, bind="127.0.0.1:1883"),
            MQTT_LISTENER_NAME: ListenerConfig(type=ListenerType.EXTERNAL),
        }
    )

    # make sure the `Broker` runs in the same loop as the aiohttp server
    loop = asyncio.get_event_loop()
    broker = Broker(config=cfg, loop=loop)

    # store broker instance so that incoming requests can hand off processing of a datastream
    _app["broker"] = broker
    # start the broker
    await broker.start()

    # pass control back to web app
    yield

    # closing activities
    await broker.shutdown()


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    main()