1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
|
import asyncio
import io
import logging
import ssl
import aiohttp
from aiohttp import web
from amqtt.adapters import ReaderAdapter, WriterAdapter
from amqtt.broker import Broker
from amqtt.contexts import BrokerConfig, ListenerConfig, ListenerType
logger = logging.getLogger(__name__)
MQTT_LISTENER_NAME = "myMqttListener"
async def hello(request):
"""Get request handler"""
return web.Response(text="Hello, world")
class WebSocketResponseReader(ReaderAdapter):
"""Interface to allow mqtt broker to read from an aiohttp websocket connection."""
def __init__(self, ws: web.WebSocketResponse):
self.ws = ws
self.buffer = bytearray()
async def read(self, n: int = -1) -> bytes:
"""Read 'n' bytes from the datastream, if < 0 read all available bytes
Raises:
BrokerPipeError : if reading on a closed websocket connection
"""
# continue until buffer contains at least the amount of data being requested
while not self.buffer or len(self.buffer) < n:
# if the websocket is closed
if self.ws.closed:
raise BrokenPipeError
try:
# read from stream
msg = await asyncio.wait_for(self.ws.receive(), timeout=0.5)
# mqtt streams should always be binary...
if msg.type == aiohttp.WSMsgType.BINARY:
self.buffer.extend(msg.data)
elif msg.type == aiohttp.WSMsgType.CLOSE:
raise BrokenPipeError
except asyncio.TimeoutError:
raise BrokenPipeError
# return all bytes currently in the buffer
if n == -1:
result = bytes(self.buffer)
self.buffer.clear()
# return the requested number of bytes from the buffer
else:
result = self.buffer[:n]
del self.buffer[:n]
return result
def feed_eof(self) -> None:
pass
class WebSocketResponseWriter(WriterAdapter):
"""Interface to allow mqtt broker to write to an aiohttp websocket connection."""
def __init__(self, ws: web.WebSocketResponse, request: web.Request):
super().__init__()
self.ws = ws
# needed for `get_peer_info`
# https://docs.python.org/3/library/socket.html#socket.socket.getpeername
peer_name = request.transport.get_extra_info("peername")
if peer_name is not None:
self.client_ip, self.port = peer_name[0:2]
else:
self.client_ip, self.port = request.remote, 0
# interpret AF_INET6
self.client_ip = "localhost" if self.client_ip == "::1" else self.client_ip
self._stream = io.BytesIO(b"")
def write(self, data: bytes) -> None:
"""Add bytes to stream buffer."""
self._stream.write(data)
async def drain(self) -> None:
"""Send the collected bytes in the buffer to the websocket connection."""
data = self._stream.getvalue()
if data and len(data):
await self.ws.send_bytes(data)
self._stream = io.BytesIO(b"")
def get_peer_info(self) -> tuple[str, int] | None:
return self.client_ip, self.port
async def close(self) -> None:
# no clean up needed, stream will be gc along with instance
pass
def get_ssl_info(self) -> ssl.SSLObject | None:
pass
async def mqtt_websocket_handler(request: web.Request) -> web.StreamResponse:
# establish connection by responding to the websocket request with the 'mqtt' protocol
ws = web.WebSocketResponse(protocols=["mqtt"])
await ws.prepare(request)
# access the broker created when the server started
b: Broker = request.app["broker"]
# hand-off the websocket data stream to the broker for handling
# `listener_name` is the same name of the externalized listener in the broker config
await b.external_connected(WebSocketResponseReader(ws), WebSocketResponseWriter(ws, request), MQTT_LISTENER_NAME)
logger.debug("websocket connection closed")
return ws
async def websocket_handler(request: web.Request) -> web.StreamResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
logging.info(msg)
logging.info("websocket connection closed")
return ws
def main():
# create an `aiohttp` server
lp = asyncio.get_event_loop()
app = web.Application()
app.add_routes(
[
web.get("/", hello), # http get request/response route
web.get("/ws", websocket_handler), # standard websocket handler
web.get("/mqtt", mqtt_websocket_handler), # websocket handler for mqtt connections
])
# create background task for running the `amqtt` broker
app.cleanup_ctx.append(run_broker)
# make sure that both `aiohttp` server and `amqtt` broker run in the same loop
# so the server can hand off the connection to the broker (prevents attached-to-a-different-loop `RuntimeError`)
web.run_app(app, loop=lp)
async def run_broker(_app):
"""App init function to start (and then shutdown) the `amqtt` broker.
https://docs.aiohttp.org/en/stable/web_advanced.html#background-tasks
"""
# standard TCP connection as well as an externalized-listener
cfg = BrokerConfig(
listeners={
"default":ListenerConfig(type=ListenerType.TCP, bind="127.0.0.1:1883"),
MQTT_LISTENER_NAME: ListenerConfig(type=ListenerType.EXTERNAL),
}
)
# make sure the `Broker` runs in the same loop as the aiohttp server
loop = asyncio.get_event_loop()
broker = Broker(config=cfg, loop=loop)
# store broker instance so that incoming requests can hand off processing of a datastream
_app["broker"] = broker
# start the broker
await broker.start()
# pass control back to web app
yield
# closing activities
await broker.shutdown()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()
|