1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
/*
Copyright (c) 2020 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License 2.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
https://www.eclipse.org/legal/epl-2.0/
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
Contributors:
Roger Light - initial implementation and documentation.
*/
#include "config.h"
#include <errno.h>
#include <fcntl.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <mosquitto.h>
#include <mqtt_protocol.h>
#include "mosquitto_ctrl.h"
static int run = 1;
static void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg, const mosquitto_property *properties)
{
struct mosq_ctrl *ctrl = obj;
UNUSED(properties);
if(ctrl->payload_callback){
ctrl->payload_callback(ctrl, msg->payloadlen, msg->payload);
}
mosquitto_disconnect_v5(mosq, 0, NULL);
run = 0;
}
static void on_publish(struct mosquitto *mosq, void *obj, int mid, int reason_code, const mosquitto_property *properties)
{
UNUSED(obj);
UNUSED(mid);
UNUSED(properties);
if(reason_code > 127){
fprintf(stderr, "Publish error: %s\n", mosquitto_reason_string(reason_code));
run = 0;
mosquitto_disconnect_v5(mosq, 0, NULL);
}
}
static void on_subscribe(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos, const mosquitto_property *properties)
{
struct mosq_ctrl *ctrl = obj;
UNUSED(mid);
UNUSED(properties);
if(qos_count == 1){
if(granted_qos[0] < 128){
/* Success */
mosquitto_publish(mosq, NULL, ctrl->request_topic, (int)strlen(ctrl->payload), ctrl->payload, ctrl->cfg.qos, 0);
free(ctrl->request_topic);
ctrl->request_topic = NULL;
free(ctrl->payload);
ctrl->payload = NULL;
}else{
if(ctrl->cfg.protocol_version == MQTT_PROTOCOL_V5){
fprintf(stderr, "Subscribe error: %s\n", mosquitto_reason_string(granted_qos[0]));
}else{
fprintf(stderr, "Subscribe error: Subscription refused.\n");
}
run = 0;
mosquitto_disconnect_v5(mosq, 0, NULL);
}
}else{
run = 0;
mosquitto_disconnect_v5(mosq, 0, NULL);
}
}
static void on_connect(struct mosquitto *mosq, void *obj, int reason_code, int flags, const mosquitto_property *properties)
{
struct mosq_ctrl *ctrl = obj;
UNUSED(flags);
UNUSED(properties);
if(reason_code == 0){
if(ctrl->response_topic){
mosquitto_subscribe(mosq, NULL, ctrl->response_topic, ctrl->cfg.qos);
free(ctrl->response_topic);
ctrl->response_topic = NULL;
}
}else{
if(ctrl->cfg.protocol_version == MQTT_PROTOCOL_V5){
if(reason_code == MQTT_RC_UNSUPPORTED_PROTOCOL_VERSION){
fprintf(stderr, "Connection error: %s. Try connecting to an MQTT v5 broker, or use MQTT v3.x mode.\n", mosquitto_reason_string(reason_code));
}else{
fprintf(stderr, "Connection error: %s\n", mosquitto_reason_string(reason_code));
}
}else{
fprintf(stderr, "Connection error: %s\n", mosquitto_connack_string(reason_code));
}
run = 0;
mosquitto_disconnect_v5(mosq, 0, NULL);
}
}
int client_request_response(struct mosq_ctrl *ctrl)
{
struct mosquitto *mosq;
int rc;
time_t start;
if(ctrl->cfg.cafile == NULL && ctrl->cfg.capath == NULL && !ctrl->cfg.tls_use_os_certs && ctrl->cfg.port != 8883
# ifdef FINAL_WITH_TLS_PSK
&& !ctrl->cfg.psk
# endif
){
fprintf(stderr, "Warning: You are running mosquitto_ctrl without encryption.\nThis means all of the configuration changes you are making are visible on the network, including passwords.\n\n");
}
mosquitto_lib_init();
mosq = mosquitto_new(ctrl->cfg.id, true, ctrl);
rc = client_opts_set(mosq, &ctrl->cfg);
if(rc) goto cleanup;
mosquitto_connect_v5_callback_set(mosq, on_connect);
mosquitto_subscribe_v5_callback_set(mosq, on_subscribe);
mosquitto_publish_v5_callback_set(mosq, on_publish);
mosquitto_message_v5_callback_set(mosq, on_message);
rc = client_connect(mosq, &ctrl->cfg);
if(rc) goto cleanup;
start = time(NULL);
while(run && start+10 > time(NULL)){
mosquitto_loop(mosq, -1, 1);
}
cleanup:
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return rc;
}
|