1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
|
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_RRDPUSH_H
#define NETDATA_RRDPUSH_H 1
#include "database/rrd.h"
#include "libnetdata/libnetdata.h"
#include "web/server/web_client.h"
#include "daemon/common.h"
#define CONNECTED_TO_SIZE 100
// ----------------------------------------------------------------------------
// obsolete versions - do not use anymore
#define STREAM_OLD_VERSION_CLAIM 3
#define STREAM_OLD_VERSION_CLABELS 4
#define STREAM_OLD_VERSION_COMPRESSION 5 // this is production
// ----------------------------------------------------------------------------
// capabilities negotiation
typedef enum {
// do not use the first 3 bits
STREAM_CAP_V1 = (1 << 3), // v1 = the oldest protocol
STREAM_CAP_V2 = (1 << 4), // v2 = the second version of the protocol (with host labels)
STREAM_CAP_VN = (1 << 5), // version negotiation supported (for versions 3, 4, 5 of the protocol)
// v3 = claiming supported
// v4 = chart labels supported
// v5 = lz4 compression supported
STREAM_CAP_VCAPS = (1 << 6), // capabilities negotiation supported
STREAM_CAP_HLABELS = (1 << 7), // host labels supported
STREAM_CAP_CLAIM = (1 << 8), // claiming supported
STREAM_CAP_CLABELS = (1 << 9), // chart labels supported
STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported
STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported
STREAM_CAP_REPLICATION = (1 << 12), // replication supported
STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
// this must be signed int, so don't use the last bit
// needed for negotiating errors between parent and child
} STREAM_CAPABILITIES;
#ifdef ENABLE_COMPRESSION
#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION
#else
#define STREAM_HAS_COMPRESSION 0
#endif // ENABLE_COMPRESSION
#define STREAM_OUR_CAPABILITIES ( \
STREAM_CAP_V1 | STREAM_CAP_V2 | STREAM_CAP_VN | STREAM_CAP_VCAPS | \
STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | \
STREAM_HAS_COMPRESSION | STREAM_CAP_FUNCTIONS | STREAM_CAP_REPLICATION | STREAM_CAP_BINARY )
#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)))
// ----------------------------------------------------------------------------
// stream handshake
#define HTTP_HEADER_SIZE 8192
#define STREAMING_PROTOCOL_VERSION "1.1"
#define START_STREAMING_PROMPT_V1 "Hit me baby, push them over..."
#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..."
#define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version="
#define START_STREAMING_ERROR_SAME_LOCALHOST "Don't hit me baby, you are trying to stream my localhost back"
#define START_STREAMING_ERROR_ALREADY_STREAMING "This GUID is already streaming to this server"
#define START_STREAMING_ERROR_NOT_PERMITTED "You are not permitted to access this. Check the logs for more info."
typedef enum {
STREAM_HANDSHAKE_OK_V5 = 5, // COMPRESSION
STREAM_HANDSHAKE_OK_V4 = 4, // CLABELS
STREAM_HANDSHAKE_OK_V3 = 3, // CLAIM
STREAM_HANDSHAKE_OK_V2 = 2, // HLABELS
STREAM_HANDSHAKE_OK_V1 = 1,
STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE = -1,
STREAM_HANDSHAKE_ERROR_LOCALHOST = -2,
STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED = -3,
STREAM_HANDSHAKE_ERROR_DENIED = -4,
STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT = -5,
STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT = -6,
STREAM_HANDSHAKE_ERROR_INVALID_CERTIFICATE = -7,
STREAM_HANDSHAKE_ERROR_SSL_ERROR = -8,
STREAM_HANDSHAKE_ERROR_CANT_CONNECT = -9
} STREAM_HANDSHAKE;
// ----------------------------------------------------------------------------
typedef struct {
char *os_name;
char *os_id;
char *os_version;
char *kernel_name;
char *kernel_version;
} stream_encoded_t;
#ifdef ENABLE_COMPRESSION
struct compressor_state {
char *compression_result_buffer;
size_t compression_result_buffer_size;
struct compressor_data *data; // Compression API specific data
void (*reset)(struct compressor_state *state);
size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
void (*destroy)(struct compressor_state **state);
};
struct decompressor_state {
size_t signature_size;
size_t total_compressed;
size_t total_uncompressed;
size_t packet_count;
struct decompressor_stream *stream; // Decompression API specific data
void (*reset)(struct decompressor_state *state);
size_t (*start)(struct decompressor_state *state, const char *header, size_t header_size);
size_t (*decompress)(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
size_t (*decompressed_bytes_in_buffer)(struct decompressor_state *state);
size_t (*get)(struct decompressor_state *state, char *data, size_t size);
void (*destroy)(struct decompressor_state **state);
};
#endif
// Thread-local storage
// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
typedef enum {
SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
} SENDER_FLAGS;
struct sender_state {
RRDHOST *host;
pid_t tid; // the thread id of the sender, from gettid()
SENDER_FLAGS flags;
int timeout;
int default_port;
usec_t reconnect_delay;
char connected_to[CONNECTED_TO_SIZE + 1]; // We don't know which proxy we connect to, passed back from socket.c
size_t begin;
size_t reconnects_counter;
size_t sent_bytes;
size_t sent_bytes_on_this_connection;
size_t send_attempts;
time_t last_traffic_seen_t;
size_t not_connected_loops;
// Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
// the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
netdata_mutex_t mutex;
struct circular_buffer *buffer;
char read_buffer[PLUGINSD_LINE_MAX + 1];
int read_len;
STREAM_CAPABILITIES capabilities;
int rrdpush_sender_pipe[2]; // collector to sender thread signaling
int rrdpush_sender_socket;
#ifdef ENABLE_COMPRESSION
struct compressor_state *compressor;
#endif
#ifdef ENABLE_HTTPS
struct netdata_ssl ssl; // structure used to encrypt the connection
#endif
struct {
DICTIONARY *requests; // de-duplication of replication requests, per chart
struct {
size_t pending_requests; // the currently outstanding replication requests
size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
bool reached_max; // true when the sender buffer should not get more replication responses
} atomic;
} replication;
struct {
size_t buffer_used_percentage; // the current utilization of the sending buffer
usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
} atomic;
};
#define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST)
#define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST)
#define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED)
#define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED)
#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED)
#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED)
#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED)
#define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
#define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
#define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication.atomic.charts_replicating), 0, __ATOMIC_RELAXED)
#define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication.atomic.pending_requests), __ATOMIC_RELAXED)
#define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED)
struct receiver_state {
RRDHOST *host;
netdata_thread_t thread;
int fd;
char *key;
char *hostname;
char *registry_hostname;
char *machine_guid;
char *os;
char *timezone; // Unused?
char *abbrev_timezone;
int32_t utc_offset;
char *tags;
char *client_ip; // Duplicated in pluginsd
char *client_port; // Duplicated in pluginsd
char *program_name; // Duplicated in pluginsd
char *program_version;
struct rrdhost_system_info *system_info;
int update_every;
STREAM_CAPABILITIES capabilities;
time_t last_msg_t;
char read_buffer[PLUGINSD_LINE_MAX + 1];
int read_len;
unsigned int shutdown:1; // Tell the thread to exit
unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!)
#ifdef ENABLE_HTTPS
struct netdata_ssl ssl;
#endif
#ifdef ENABLE_COMPRESSION
unsigned int rrdpush_compression;
struct decompressor_state *decompressor;
#endif
time_t replication_first_time_t;
};
struct rrdpush_destinations {
STRING *destination;
const char *last_error;
time_t postpone_reconnection_until;
STREAM_HANDSHAKE last_handshake;
struct rrdpush_destinations *prev;
struct rrdpush_destinations *next;
};
extern unsigned int default_rrdpush_enabled;
#ifdef ENABLE_COMPRESSION
extern unsigned int default_compression_enabled;
#endif
extern char *default_rrdpush_destination;
extern char *default_rrdpush_api_key;
extern char *default_rrdpush_send_charts_matching;
extern bool default_rrdpush_enable_replication;
extern time_t default_rrdpush_seconds_to_replicate;
extern time_t default_rrdpush_replication_step;
extern unsigned int remote_clock_resync_iterations;
void rrdpush_destinations_init(RRDHOST *host);
void rrdpush_destinations_free(RRDHOST *host);
void sender_init(RRDHOST *host);
BUFFER *sender_start(struct sender_state *s);
void sender_commit(struct sender_state *s, BUFFER *wb);
void sender_cancel(struct sender_state *s);
int rrdpush_init();
bool rrdpush_receiver_needs_dbengine();
int configured_as_parent();
void rrdset_done_push(RRDSET *st);
bool rrdset_push_chart_definition_now(RRDSET *st);
void *rrdpush_sender_thread(void *ptr);
void rrdpush_send_host_labels(RRDHOST *host);
void rrdpush_claimed_id(RRDHOST *host);
int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
void rrdpush_sender_thread_stop(RRDHOST *host);
void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, const RRDVAR_ACQUIRED *rva);
void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg);
int connect_to_one_of_destinations(
RRDHOST *host,
int default_port,
struct timeval *timeout,
size_t *reconnects_counter,
char *connected_to,
size_t connected_to_size,
struct rrdpush_destinations **destination);
void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
#ifdef ENABLE_COMPRESSION
struct compressor_state *create_compressor();
struct decompressor_state *create_decompressor();
#endif
void log_receiver_capabilities(struct receiver_state *rpt);
void log_sender_capabilities(struct sender_state *s);
STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version);
int32_t stream_capabilities_to_vn(uint32_t caps);
#include "replication.h"
#endif //NETDATA_RRDPUSH_H
|