1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
/* Copyright (C) CZ.NIC, z.s.p.o. <knot-resolver@labs.nic.cz>
* SPDX-License-Identifier: GPL-3.0-or-later
*/
#include "kresconfig.h"
#include "daemon/udp_queue.h"
#include "daemon/worker.h"
#include "lib/generic/array.h"
#include "lib/utils.h"
struct qr_task;
#include <sys/socket.h>
#if !ENABLE_SENDMMSG
int udp_queue_init_global(uv_loop_t *loop)
{
return 0;
}
/* Appease the linker in case this unused call isn't optimized out. */
void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
{
abort();
}
#else
/* LATER: it might be useful to have this configurable during runtime,
* but the structures below would have to change a little (broken up). */
#define UDP_QUEUE_LEN 64
/** A queue of up to UDP_QUEUE_LEN messages, meant for the same socket. */
typedef struct {
int len; /**< The number of messages in the queue: 0..UDP_QUEUE_LEN */
struct mmsghdr msgvec[UDP_QUEUE_LEN]; /**< Parameter for sendmmsg() */
struct {
struct qr_task *task; /**< Links for completion callbacks. */
struct iovec msg_iov[1]; /**< storage for .msgvec[i].msg_iov */
} items[UDP_QUEUE_LEN];
} udp_queue_t;
static udp_queue_t * udp_queue_create()
{
udp_queue_t *q = calloc(1, sizeof(*q));
kr_require(q != NULL);
for (int i = 0; i < UDP_QUEUE_LEN; ++i) {
struct msghdr *mhi = &q->msgvec[i].msg_hdr;
/* These shall remain always the same. */
mhi->msg_iov = q->items[i].msg_iov;
mhi->msg_iovlen = 1;
/* msg_name and msg_namelen will be per-call,
* and the rest is OK to remain zeroed all the time. */
}
return q;
}
/** Global state for udp_queue_*. Note: we never free the pointed-to memory. */
struct {
/** Singleton map: fd -> udp_queue_t, as a simple array of pointers. */
udp_queue_t **udp_queues;
int udp_queues_len;
/** List of FD numbers that might have a non-empty queue. */
array_t(int) waiting_fds;
uv_check_t check_handle;
} static state = {0};
/** Empty the given queue. The queue is assumed to exist (but may be empty). */
static void udp_queue_send(int fd)
{
udp_queue_t *const q = state.udp_queues[fd];
if (!q->len) return;
int sent_len = sendmmsg(fd, q->msgvec, q->len, 0);
/* ATM we don't really do anything about failures. */
int err = sent_len < 0 ? errno : EAGAIN /* unknown error, really */;
for (int i = 0; i < q->len; ++i) {
qr_task_on_send(q->items[i].task, NULL, i < sent_len ? 0 : err);
worker_task_unref(q->items[i].task);
}
q->len = 0;
}
/** Periodical callback to send all queued packets. */
static void udp_queue_check(uv_check_t *handle)
{
for (int i = 0; i < state.waiting_fds.len; ++i) {
udp_queue_send(state.waiting_fds.at[i]);
}
state.waiting_fds.len = 0;
}
int udp_queue_init_global(uv_loop_t *loop)
{
int ret = uv_check_init(loop, &state.check_handle);
if (!ret) ret = uv_check_start(&state.check_handle, udp_queue_check);
return ret;
}
void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
{
if (fd < 0) {
kr_log_error(SYSTEM, "ERROR: called udp_queue_push(fd = %d, ...)\n", fd);
abort();
}
worker_task_ref(task);
/* Get a valid correct queue. */
if (fd >= state.udp_queues_len) {
const int new_len = fd + 1;
state.udp_queues = realloc(state.udp_queues,
sizeof(state.udp_queues[0]) * new_len);
if (!state.udp_queues) abort();
memset(state.udp_queues + state.udp_queues_len, 0,
sizeof(state.udp_queues[0]) * (new_len - state.udp_queues_len));
state.udp_queues_len = new_len;
}
if (unlikely(state.udp_queues[fd] == NULL))
state.udp_queues[fd] = udp_queue_create();
udp_queue_t *const q = state.udp_queues[fd];
/* Append to the queue */
struct sockaddr *sa = (struct sockaddr *)/*const-cast*/req->qsource.comm_addr;
q->msgvec[q->len].msg_hdr.msg_name = sa;
q->msgvec[q->len].msg_hdr.msg_namelen = kr_sockaddr_len(sa);
q->items[q->len].task = task;
q->items[q->len].msg_iov[0] = (struct iovec){
.iov_base = req->answer->wire,
.iov_len = req->answer->size,
};
if (q->len == 0)
array_push(state.waiting_fds, fd);
++(q->len);
if (q->len >= UDP_QUEUE_LEN) {
kr_assert(q->len == UDP_QUEUE_LEN);
udp_queue_send(fd);
/* We don't need to search state.waiting_fds;
* anyway, it's more efficient to let the hook do that. */
}
}
#endif
|