File: rpc_math_srvr.cpp

package info (click to toggle)
paho.mqtt.cpp 1.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,672 kB
  • sloc: cpp: 13,068; ansic: 113; sh: 55; makefile: 22
file content (193 lines) | stat: -rw-r--r-- 5,934 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
// rpc_math_srvr.cpp
//
// This is a Paho MQTT C++ client, sample application.
//
// This application is an MQTT consumer/subscriber using the C++ synchronous
// client interface, which uses the queuing API to receive messages.
//
// The sample demonstrates:
//  - Connecting to an MQTT server/broker
//  - Subscribing to multiple topics
//  - Receiving messages through the queueing consumer API
//  - Receiving and acting upon commands via MQTT topics
//  - Manual reconnects
//  - Using a persistent (non-clean) session
//

/*******************************************************************************
 * Copyright (c) 2019-2023 Frank Pagliughi <fpagliughi@mindspring.com>
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v2.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    http://www.eclipse.org/legal/epl-v20.html
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Frank Pagliughi - initial implementation and documentation
 *******************************************************************************/

#include <cctype>
#include <chrono>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <sstream>
#include <string>
#include <thread>

#include "mqtt/client.h"

using namespace std;
using namespace std::chrono;

const string SERVER_ADDRESS{"mqtt://localhost:1883"};
const string CLIENT_ID{"rpc_math_srvr"};

constexpr auto RESPONSE_TOPIC = mqtt::property::RESPONSE_TOPIC;
constexpr auto CORRELATION_DATA = mqtt::property::CORRELATION_DATA;

// --------------------------------------------------------------------------
// Simple function to manually reconnect a client.

bool try_reconnect(mqtt::client& cli)
{
    constexpr int N_ATTEMPT = 30;

    for (int i = 0; i < N_ATTEMPT && !cli.is_connected(); ++i) {
        try {
            cli.reconnect();
            return true;
        }
        catch (const mqtt::exception&) {
            this_thread::sleep_for(seconds(1));
        }
    }
    return false;
}

// --------------------------------------------------------------------------
// RPC function implementations

double add(const std::vector<double>& nums)
{
    double sum = 0.0;
    for (auto n : nums) sum += n;
    return sum;
}

double mult(const std::vector<double>& nums)
{
    double prod = 1.0;
    for (auto n : nums) prod *= n;
    return prod;
}

/////////////////////////////////////////////////////////////////////////////

int main(int argc, char* argv[])
{
    mqtt::client cli(SERVER_ADDRESS, CLIENT_ID);

    auto connOpts = mqtt::connect_options_builder::v5()
                        .keep_alive_interval(seconds(20))
                        .clean_start()
                        .finalize();

    const vector<string> TOPICS{"requests/math", "requests/math/#"};
    const vector<int> QOS{1, 1};

    try {
        cout << "Connecting to the MQTT server..." << flush;
        cli.connect(connOpts);
        cli.subscribe(TOPICS, QOS);
        cout << "OK\n" << endl;

        // Consume messages

        cout << "Waiting for RPC requests..." << endl;
        while (true) {
            auto msg = cli.consume_message();

            if (!msg) {
                if (!cli.is_connected()) {
                    cout << "Lost connection. Attempting reconnect" << endl;
                    if (try_reconnect(cli)) {
                        cli.subscribe(TOPICS, QOS);
                        cout << "Reconnected" << endl;
                        continue;
                    }
                    else {
                        cout << "Reconnect failed." << endl;
                        break;
                    }
                }
                else
                    break;
            }

            cout << "Received a request" << endl;

            const mqtt::properties& props = msg->get_properties();

            if (props.contains(RESPONSE_TOPIC) && props.contains(CORRELATION_DATA)) {
                mqtt::binary corr_id = mqtt::get<string>(props, CORRELATION_DATA);
                string reply_to = mqtt::get<string>(props, RESPONSE_TOPIC);

                cout << "Client wants a reply to [" << corr_id << "] on '" << reply_to << "'"
                     << endl;

                cout << msg->get_topic() << ": " << msg->to_string() << endl;

                char c;
                double x;
                vector<double> nums;

                istringstream is(msg->to_string());
                if (!(is >> c) || c != '[') {
                    cout << "Malformed arguments" << endl;
                    // Maybe send an error message to client.
                    continue;
                }

                c = ',';
                while (c == ',' && (is >> x >> c)) nums.push_back(x);

                if (c != ']') {
                    cout << "Bad closing delimiter" << endl;
                    continue;
                }

                x = 0.0;
                if (msg->get_topic() == "requests/math/add")
                    x = add(nums);
                else if (msg->get_topic() == "requests/math/mult")
                    x = mult(nums);
                else {
                    cout << "Unknown request: " << msg->get_topic() << endl;
                    continue;
                }

                cout << "  Result: " << x << endl;

                auto reply_msg = mqtt::message::create(reply_to, to_string(x), 1, false);
                cli.publish(reply_msg);
            }
        }

        // Disconnect

        cout << "\nDisconnecting from the MQTT server..." << flush;
        cli.disconnect();
        cout << "OK" << endl;
    }
    catch (const mqtt::exception& exc) {
        cerr << exc.what() << endl;
        return 1;
    }

    return 0;
}