1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
#!/usr/bin/env python3
import asyncio
import socket
import uuid
import context # Ensures paho is in PYTHONPATH
import paho.mqtt.client as mqtt
client_id = 'paho-mqtt-python/issue72/' + str(uuid.uuid4())
topic = client_id
print("Using client_id / topic: " + client_id)
class AsyncioHelper:
def __init__(self, loop, client):
self.loop = loop
self.client = client
self.client.on_socket_open = self.on_socket_open
self.client.on_socket_close = self.on_socket_close
self.client.on_socket_register_write = self.on_socket_register_write
self.client.on_socket_unregister_write = self.on_socket_unregister_write
def on_socket_open(self, client, userdata, sock):
print("Socket opened")
def cb():
print("Socket is readable, calling loop_read")
client.loop_read()
self.loop.add_reader(sock, cb)
self.misc = self.loop.create_task(self.misc_loop())
def on_socket_close(self, client, userdata, sock):
print("Socket closed")
self.loop.remove_reader(sock)
self.misc.cancel()
def on_socket_register_write(self, client, userdata, sock):
print("Watching socket for writability.")
def cb():
print("Socket is writable, calling loop_write")
client.loop_write()
self.loop.add_writer(sock, cb)
def on_socket_unregister_write(self, client, userdata, sock):
print("Stop watching socket for writability.")
self.loop.remove_writer(sock)
async def misc_loop(self):
print("misc_loop started")
while self.client.loop_misc() == mqtt.MQTT_ERR_SUCCESS:
try:
await asyncio.sleep(1)
except asyncio.CancelledError:
break
print("misc_loop finished")
class AsyncMqttExample:
def __init__(self, loop):
self.loop = loop
def on_connect(self, client, userdata, flags, reason_code, properties):
print("Subscribing")
client.subscribe(topic)
def on_message(self, client, userdata, msg):
if not self.got_message:
print("Got unexpected message: {}".format(msg.decode()))
else:
self.got_message.set_result(msg.payload)
def on_disconnect(self, client, userdata, flags, reason_code, properties):
self.disconnected.set_result(reason_code)
async def main(self):
self.disconnected = self.loop.create_future()
self.got_message = None
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=client_id)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
aioh = AsyncioHelper(self.loop, self.client)
self.client.connect('mqtt.eclipseprojects.io', 1883, 60)
self.client.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)
for c in range(3):
await asyncio.sleep(5)
print("Publishing")
self.got_message = self.loop.create_future()
self.client.publish(topic, b'Hello' * 40000, qos=1)
msg = await self.got_message
print("Got response with {} bytes".format(len(msg)))
self.got_message = None
self.client.disconnect()
print("Disconnected: {}".format(await self.disconnected))
print("Starting")
loop = asyncio.get_event_loop()
loop.run_until_complete(AsyncMqttExample(loop).main())
loop.close()
print("Finished")
|