1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
|
#include <uwsgi.h>
/*
this is a stats pusher plugin for sendign metrics over a udp socket
--stats-push socket:address[,prefix]
example:
--stats-push socket:127.0.0.1:8125,myinstance
it exports values exposed by the metric subsystem
(it is based on the statsd plugin)
*/
extern struct uwsgi_server uwsgi;
// configuration of a socket node
struct socket_node {
int fd;
union uwsgi_sockaddr addr;
socklen_t addr_len;
char *prefix;
uint16_t prefix_len;
};
static int socket_send_metric(struct uwsgi_buffer *ub, struct uwsgi_stats_pusher_instance *uspi, struct uwsgi_metric *um) {
struct socket_node *sn = (struct socket_node *) uspi->data;
// reset the buffer
ub->pos = 0;
if (uwsgi_buffer_append(ub, sn->prefix, sn->prefix_len)) return -1;
if (uwsgi_buffer_append(ub, ".", 1)) return -1;
if (uwsgi_buffer_append(ub, um->name, um->name_len)) return -1;
if (uwsgi_buffer_append(ub, " ", 1)) return -1;
if (uwsgi_buffer_num64(ub, (int64_t) um->type)) return -1;
if (uwsgi_buffer_append(ub, " ", 1)) return -1;
if (uwsgi_buffer_num64(ub, *um->value)) return -1;
if (sendto(sn->fd, ub->buf, ub->pos, 0, (struct sockaddr *) &sn->addr.sa_in, sn->addr_len) < 0) {
uwsgi_error("socket_send_metric()/sendto()");
}
return 0;
}
static void stats_pusher_socket(struct uwsgi_stats_pusher_instance *uspi, time_t now, char *json, size_t json_len) {
if (!uspi->configured) {
struct socket_node *sn = uwsgi_calloc(sizeof(struct socket_node));
char *comma = strchr(uspi->arg, ',');
if (comma) {
sn->prefix = comma+1;
sn->prefix_len = strlen(sn->prefix);
*comma = 0;
}
else {
sn->prefix = "uwsgi";
sn->prefix_len = 5;
}
char *colon = strchr(uspi->arg, ':');
if (!colon) {
uwsgi_log("invalid socket address %s\n", uspi->arg);
if (comma) *comma = ',';
free(sn);
return;
}
sn->addr_len = socket_to_in_addr(uspi->arg, colon, 0, &sn->addr.sa_in);
sn->fd = socket(AF_INET, SOCK_DGRAM, 0);
if (sn->fd < 0) {
uwsgi_error("stats_pusher_socket()/socket()");
if (comma) *comma = ',';
free(sn);
return;
}
uwsgi_socket_nb(sn->fd);
if (comma) *comma = ',';
uspi->data = sn;
uspi->configured = 1;
}
// we use the same buffer for all of the packets
struct uwsgi_buffer *ub = uwsgi_buffer_new(uwsgi.page_size);
struct uwsgi_metric *um = uwsgi.metrics;
while(um) {
uwsgi_rlock(uwsgi.metrics_lock);
socket_send_metric(ub, uspi, um);
uwsgi_rwunlock(uwsgi.metrics_lock);
if (um->reset_after_push){
uwsgi_wlock(uwsgi.metrics_lock);
*um->value = um->initial_value;
uwsgi_rwunlock(uwsgi.metrics_lock);
}
um = um->next;
}
uwsgi_buffer_destroy(ub);
}
static void stats_pusher_socket_init(void) {
struct uwsgi_stats_pusher *usp = uwsgi_register_stats_pusher("socket", stats_pusher_socket);
// we use a custom format not the JSON one
usp->raw = 1;
}
struct uwsgi_plugin stats_pusher_socket_plugin = {
.name = "stats_pusher_socket",
.on_load = stats_pusher_socket_init,
};
|