File: sync_publish.cpp

package info (click to toggle)
paho.mqtt.cpp 1.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,672 kB
  • sloc: cpp: 13,068; ansic: 113; sh: 55; makefile: 22
file content (224 lines) | stat: -rw-r--r-- 7,269 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
// sync_publish.cpp
//
// This is a Paho MQTT C++ client, sample application.
//
// It's an example of how to send messages as an MQTT publisher using the
// C++ synchronous client interface.
//
// The sample demonstrates:
//  - Connecting to an MQTT server/broker
//  - Publishing messages
//  - User-defined persistence
//

/*******************************************************************************
 * Copyright (c) 2013-2023 Frank Pagliughi <fpagliughi@mindspring.com>
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v2.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    http://www.eclipse.org/legal/epl-v20.html
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Frank Pagliughi - initial implementation and documentation
 *******************************************************************************/

#include <cstdlib>
#include <cstring>
#include <iostream>
#include <map>
#include <string>
#include <vector>

#include "mqtt/client.h"

const std::string DFLT_SERVER_URI{"mqtt://localhost:1883"};
const std::string TOPIC{"hello"};

const std::string PAYLOAD1{"Hello World!"};

const char* PAYLOAD2 = "Hi there!";
const char* PAYLOAD3 = "Is anyone listening?";

const int QOS = 1;

/////////////////////////////////////////////////////////////////////////////

// Example of a simple, in-memory persistence class.
//
// This is an extremely silly example, because if you want to use
// persistence, you actually need it to be out of process so that if the
// client crashes and restarts, the persistence data still exists.
//
// This is just here to show how the persistence API callbacks work. It maps
// well to key/value stores, like Redis, but only if it's on the local host,
// as it wouldn't make sense to persist data over the network, since that's
// what the MQTT client it trying to do.
//
class sample_mem_persistence : virtual public mqtt::iclient_persistence
{
    // Whether the store is open
    bool open_;

    // Use an STL map to store shared persistence pointers
    // against string keys.
    std::map<std::string, std::string> store_;

public:
    sample_mem_persistence() : open_(false) {}

    // "Open" the store
    void open(const std::string& clientId, const std::string& serverURI) override
    {
        std::cout << "  [Opening persistence store for '" << clientId << "' at '" << serverURI
                  << "']" << std::endl;
        open_ = true;
    }

    // Close the persistent store that was previously opened.
    void close() override
    {
        std::cout << "  [Closing persistence store.]" << std::endl;
        open_ = false;
    }

    // Clears persistence, so that it no longer contains any persisted data.
    void clear() override
    {
        std::cout << "  [Clearing persistence store.]" << std::endl;
        store_.clear();
    }

    // Returns whether or not data is persisted using the specified key.
    bool contains_key(const std::string& key) override
    {
        return store_.find(key) != store_.end();
    }

    // Returns the keys in this persistent data store.
    mqtt::string_collection keys() const override
    {
        mqtt::string_collection ks;
        for (const auto& k : store_) ks.push_back(k.first);
        return ks;
    }

    // Puts the specified data into the persistent store.
    void put(const std::string& key, const std::vector<mqtt::string_view>& bufs) override
    {
        std::cout << "  [Persisting data with key '" << key << "']" << std::endl;
        std::string str;
        for (const auto& b : bufs) str.append(b.data(), b.size());  // += b.str();
        store_[key] = std::move(str);
    }

    // Gets the specified data out of the persistent store.
    std::string get(const std::string& key) const override
    {
        std::cout << "  [Searching persistence for key '" << key << "']" << std::endl;
        auto p = store_.find(key);
        if (p == store_.end())
            throw mqtt::persistence_exception();
        std::cout << "  [Found persistence data for key '" << key << "']" << std::endl;

        return p->second;
    }

    // Remove the data for the specified key.
    void remove(const std::string& key) override
    {
        std::cout << "  [Persistence removing key '" << key << "']" << std::endl;
        auto p = store_.find(key);
        if (p == store_.end())
            throw mqtt::persistence_exception();
        store_.erase(p);
        std::cout << "  [Persistence key removed '" << key << "']" << std::endl;
    }
};

/////////////////////////////////////////////////////////////////////////////
// Class to receive callbacks

class user_callback : public virtual mqtt::callback
{
    void connection_lost(const std::string& cause) override
    {
        std::cout << "\nConnection lost" << std::endl;
        if (!cause.empty())
            std::cout << "\tcause: " << cause << std::endl;
    }

    void delivery_complete(mqtt::delivery_token_ptr tok) override
    {
        std::cout << "\n  [Delivery complete for token: "
                  << (tok ? tok->get_message_id() : -1) << "]" << std::endl;
    }

public:
};

// --------------------------------------------------------------------------

int main(int argc, char* argv[])
{
    auto serverURI = (argc > 1) ? std::string{argv[1]} : DFLT_SERVER_URI;

    std::cout << "Initializing..." << std::endl;
    sample_mem_persistence persist;
    mqtt::client client(serverURI, "", &persist);

    user_callback cb;
    client.set_callback(cb);

    mqtt::connect_options connOpts;
    connOpts.set_keep_alive_interval(20);
    connOpts.set_clean_session(true);
    std::cout << "...OK" << std::endl;

    try {
        std::cout << "\nConnecting..." << std::endl;
        client.connect(connOpts);
        std::cout << "...OK" << std::endl;

        // First use a message pointer.

        std::cout << "\nSending message..." << std::endl;
        auto pubmsg = mqtt::make_message(TOPIC, PAYLOAD1);
        pubmsg->set_qos(QOS);
        client.publish(pubmsg);
        std::cout << "...OK" << std::endl;

        // Now try with itemized publish.

        std::cout << "\nSending next message..." << std::endl;
        client.publish(TOPIC, PAYLOAD2, strlen(PAYLOAD2) + 1);
        std::cout << "...OK" << std::endl;

        // Now try with a listener, no token, and non-heap message

        std::cout << "\nSending final message..." << std::endl;
        client.publish(mqtt::message(TOPIC, PAYLOAD3, QOS, false));
        std::cout << "OK" << std::endl;

        // Disconnect
        std::cout << "\nDisconnecting..." << std::endl;
        client.disconnect();
        std::cout << "...OK" << std::endl;
    }
    catch (const mqtt::persistence_exception& exc) {
        std::cerr << "Persistence Error: " << exc.what() << " [" << exc.get_reason_code()
                  << "]" << std::endl;
        return 1;
    }
    catch (const mqtt::exception& exc) {
        std::cerr << exc.what() << std::endl;
        return 1;
    }

    std::cout << "\nExiting" << std::endl;
    return 0;
}