File: async_publish_time.cpp

package info (click to toggle)
paho.mqtt.cpp 1.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,672 kB
  • sloc: cpp: 13,068; ansic: 113; sh: 55; makefile: 22
file content (173 lines) | stat: -rw-r--r-- 5,889 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// async_publish_time.cpp
//
// This is a Paho MQTT C++ client, sample application.
//
// It's a fairly contrived, but useful example of an MQTT data monitor and
// publisher, using the C++ asynchronous client interface. A fairly common
// usage for MQTT applications to monitor a sensor and publish the reading
// when it changes by a "significant" amount (whatever that may be).
// This might be temperature, pressure, humidity, soil moisture, CO2 levels,
// or anything like that.
//
// Since we don't have a universal sensor to use for this example, we simply
// use time itself as out input data. We periodically "sample" the time
// value and when it changes by more than our required delta amount, we
// publish the time. In this case we use the system clock, measuring the
// time with millisecond precision.
//
// The sample demonstrates:
//  - Connecting to an MQTT server/broker
//  - Sampling a value
//  - Publishing messages using a `topic` object
//  - Last will and testament
//  - Callbacks with lambdas (on connect and disconnect)
//  - Using `create_options`
//  - Creating options with builder classes
//  - Offline buffering in the client
//

/*******************************************************************************
 * Copyright (c) 2019-2023 Frank Pagliughi <fpagliughi@mindspring.com>
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v2.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    http://www.eclipse.org/legal/epl-v20.html
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Frank Pagliughi - initial implementation and documentation
 *******************************************************************************/

#include <atomic>
#include <chrono>
#include <csignal>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <string>
#include <thread>  // For sleep

#include "mqtt/async_client.h"

using namespace std;
using namespace std::chrono;

const std::string DFLT_SERVER_URI{"mqtt://localhost:1883"};

// The QoS for sending data
const int QOS = 1;

// How often to sample the "data"
const auto SAMPLE_PERIOD = 5ms;

// How much the "data" needs to change before we publish a new value.
const int DELTA_MS = 100;

// How many to buffer while off-line
const int MAX_BUFFERED_MESSAGES = 1200;

// Atomic flag to tell the main loop to exit.
atomic<bool> quit{false};

// Handler for ^C (SIGINT)
void ctrlc_handler(int) { quit = true; }

// --------------------------------------------------------------------------
// Gets the current time as the number of milliseconds since the epoch:
// like a time_t with ms resolution.

uint64_t timestamp()
{
    auto now = system_clock::now();
    auto tse = now.time_since_epoch();
    auto msTm = duration_cast<milliseconds>(tse);
    return uint64_t(msTm.count());
}

// --------------------------------------------------------------------------

int main(int argc, char* argv[])
{
    // The server URI (address)
    string serverURI = (argc > 1) ? string{argv[1]} : DFLT_SERVER_URI;

    // The amount of time to run (in ms). Zero means "run forever".
    uint64_t trun = (argc > 2) ? stoll(argv[2]) : 0LL;

    cout << "Initializing for server '" << serverURI << "'..." << endl;

    // We configure to allow publishing to the client while off-line,
    // and that it's OK to do so before the 1st successful connection.
    auto createOpts = mqtt::create_options_builder()
                          .server_uri(serverURI)
                          .send_while_disconnected(true, true)
                          .max_buffered_messages(MAX_BUFFERED_MESSAGES)
                          .delete_oldest_messages()
                          .finalize();

    mqtt::async_client cli(createOpts);

    // Set callbacks for when connected and connection lost.

    cli.set_connected_handler([&cli](const std::string&) {
        std::cout << "*** Connected (" << timestamp() << ") ***" << std::endl;
    });

    cli.set_connection_lost_handler([&cli](const std::string&) {
        std::cout << "*** Connection Lost (" << timestamp() << ") ***" << std::endl;
    });

    auto willMsg = mqtt::message("test/events", "Time publisher disconnected", 1, true);
    auto connOpts = mqtt::connect_options_builder()
                        .clean_session()
                        .will(willMsg)
                        .keep_alive_interval(10s)
                        .automatic_reconnect(seconds(1), seconds(10))
                        .finalize();

    try {
        // Note that we start the connection, but don't wait for completion.
        // We configured to allow publishing before a successful connection.
        cout << "Starting connection..." << endl;
        cli.connect(connOpts);

        auto top = mqtt::topic(cli, "data/time", QOS);
        cout << "Publishing data..." << endl;

        // Install a ^C handler for user to signal when to exit
        signal(SIGINT, ctrlc_handler);

        // Sync clock to start of delta period
        while (timestamp() % DELTA_MS != 0);

        uint64_t t = timestamp(), tlast = t, tstart = t;
        top.publish(to_string(t));

        while (!quit) {
            this_thread::sleep_for(SAMPLE_PERIOD);

            t = timestamp();

            if (abs(int(t - tlast)) >= DELTA_MS)
                top.publish(to_string(tlast = t));

            if (trun > 0 && t >= (trun + tstart))
                break;
        }

        // Disconnect
        cout << "\nDisconnecting..." << endl;
        cli.disconnect()->wait();
        cout << "  ...OK" << endl;
    }
    catch (const mqtt::exception& exc) {
        cerr << exc.what() << endl;
        return 1;
    }

    return 0;
}