1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright (c) 2020 Frank Pagliughi <fpagliughi@mindspring.com>
# All rights reserved.
#
# This program and the accompanying materials are made available
# under the terms of the Eclipse Distribution License v1.0
# which accompanies this distribution.
#
# The Eclipse Distribution License is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
# Contributors:
# Frank Pagliughi - initial implementation
#
# This shows an example of an MQTTv5 Remote Procedure Call (RPC) server.
import json
import context # Ensures paho is in PYTHONPATH
import paho.mqtt.client as mqtt
from paho.mqtt.packettypes import PacketTypes
# The math functions exported
def add(nums):
sum = 0
for x in nums:
sum += x
return sum
def mult(nums):
prod = 1
for x in nums:
prod *= x
return prod
# Remember that the MQTTv5 callback takes the additional 'props' parameter.
def on_connect(mqttc, userdata, flags, reason_code, props):
print(f"Connected: '{flags}', '{reason_code}', '{props}'")
if not flags.session_present:
print("Subscribing to math requests")
mqttc.subscribe("requests/math/#")
# Each incoming message should be an RPC request on the
# 'requests/math/#' topic.
def on_message(mqttc, userdata, msg):
print(msg.topic + " " + str(msg.payload))
# Get the response properties, abort if they're not given
props = msg.properties
if not hasattr(props, 'ResponseTopic') or not hasattr(props, 'CorrelationData'):
print("No reply requested")
return
corr_id = props.CorrelationData
reply_to = props.ResponseTopic
# The command parameters are in the payload
nums = json.loads(msg.payload)
# The requested command is at the end of the topic
res = 0
if msg.topic.endswith("add"):
res = add(nums)
elif msg.topic.endswith("mult"):
res = mult(nums)
# Now we have the result, res, so send it back on the 'reply_to'
# topic using the same correlation ID as the request.
print("Sending response "+str(res)+" on '"+reply_to+"': "+str(corr_id))
props = mqtt.Properties(PacketTypes.PUBLISH)
props.CorrelationData = corr_id
payload = json.dumps(res)
mqttc.publish(reply_to, payload, qos=1, properties=props)
def on_log(mqttc, obj, level, string):
print(string)
# Typically with an RPC service, you want to make sure that you're the only
# client answering requests for specific topics. Using a known client ID
# might help.
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="paho_rpc_math_srvr", protocol=mqtt.MQTTv5)
mqttc.on_message = on_message
mqttc.on_connect = on_connect
# Uncomment to enable debug messages
#mqttc.on_log = on_log
mqttc.connect(host="mqtt.eclipseprojects.io", clean_start=False)
mqttc.loop_forever()
|