File: rpc_math_cli.cpp

package info (click to toggle)
paho.mqtt.cpp 1.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,672 kB
  • sloc: cpp: 13,068; ansic: 113; sh: 55; makefile: 22
file content (152 lines) | stat: -rw-r--r-- 4,593 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// rpc_math_cli.cpp
//
// This is a Paho MQTT v5 C++ sample application.
//
// It's an example of how to create a client for performing remote procedure
// calls using MQTT with the 'response topic' and 'correlation data'
// properties.
//
// The sample demonstrates:
//  - Connecting to an MQTT server/broker
//  - Using MQTT v5 properties
//  - Publishing RPC request messages
//  - Using asynchronous tokens
//	- Subscribing to reply topic
//

/*******************************************************************************
 * Copyright (c) 2019-2024 Frank Pagliughi <fpagliughi@mindspring.com>
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v2.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    http://www.eclipse.org/legal/epl-v20.html
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Frank Pagliughi - initial implementation and documentation
 *******************************************************************************/

#include <atomic>
#include <chrono>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <sstream>
#include <string>
#include <thread>

#include "mqtt/async_client.h"
#include "mqtt/properties.h"

using namespace std;
using namespace std::chrono;

const string SERVER_ADDRESS{"mqtt://localhost:1883"};
const auto TIMEOUT = std::chrono::seconds(10);

/////////////////////////////////////////////////////////////////////////////

int main(int argc, char* argv[])
{
    if (argc < 4) {
        cout << "USAGE: rpc_math_cli <add|mult> <num1> <num2> [... numN]" << endl;
        return 1;
    }

    constexpr int QOS = 1;
    const string REQ_TOPIC_HDR{"requests/math/"};

    // Create a client
    mqtt::async_client cli(SERVER_ADDRESS, "");

    cli.start_consuming();

    try {
        cout << "Connecting..." << flush;
        auto connOpts = mqtt::connect_options::v5();

        mqtt::token_ptr tok = cli.connect(connOpts);
        auto connRsp = tok->get_connect_response();
        cout << "OK (" << connRsp.get_server_uri() << ")" << endl;

        // Since we gave an empty client ID, the server should create a
        // unique one for us and send it back as ASSIGNED_CLIENT_IDENTIFIER
        // in the connect properties.

        string clientId =
            get<string>(connRsp.get_properties(), mqtt::property::ASSIGNED_CLIENT_IDENTIFIER);

        // So now we can create a unique RPC response topic using
        // the assigned (unique) client ID.

        string repTopic = "replies/" + clientId + "/math";
        cout << "    Reply topic: " << repTopic << endl;

        // Subscribe to the reply topic and verify the QoS

        tok = cli.subscribe(repTopic, QOS);
        tok->wait();

        if (int(tok->get_reason_code()) != QOS) {
            cerr << "Error: Server doesn't support reply QoS: [" << tok->get_reason_code()
                 << "]" << endl;
            return 2;
        }

        // Create and send the request message

        string req{argv[1]}, reqTopic{REQ_TOPIC_HDR + req};

        mqtt::properties props{
            {mqtt::property::RESPONSE_TOPIC, repTopic},
            {mqtt::property::CORRELATION_DATA, "1"}
        };

        ostringstream os;
        os << "[ ";
        for (int i = 2; i < argc - 1; ++i) os << argv[i] << ", ";
        os << argv[argc - 1] << " ]";

        string reqArgs{os.str()};

        cout << "\nSending '" << req << "' request " << os.str() << "..." << flush;
        auto pubmsg = mqtt::message_ptr_builder()
                          .topic(reqTopic)
                          .payload(reqArgs)
                          .qos(QOS)
                          .properties(props)
                          .finalize();

        cli.publish(pubmsg)->wait_for(TIMEOUT);
        cout << "OK" << endl;

        // Wait for reply.

        auto msg = cli.try_consume_message_for(seconds(5));
        if (!msg) {
            cerr << "Didn't receive a reply from the service." << endl;
            return 1;
        }

        cout << "  Result: " << msg->to_string() << endl;

        // Unsubscribe

        cli.unsubscribe(repTopic)->wait();

        // Disconnect
        cout << "\nDisconnecting..." << flush;
        cli.disconnect()->wait();
        cout << "OK" << endl;
    }
    catch (const mqtt::exception& exc) {
        cerr << exc.what() << endl;
        return 1;
    }

    return 0;
}