1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434
|
/*
* Copyright (C) 2010-2025 Red Hat, Inc. All rights reserved.
*
* Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
* Federico Simoncelli <fsimon@kronosnet.org>
*
* This software licensed under LGPL-2.0+
*/
#ifndef __KNET_INTERNALS_H__
#define __KNET_INTERNALS_H__
/*
* NOTE: you shouldn't need to include this header normally
*/
#include <pthread.h>
#include <stddef.h>
#include <qb/qblist.h>
#include "libknet.h"
#include "onwire.h"
#include "compat.h"
#include "threads_common.h"
#define KNET_DATABUFSIZE KNET_MAX_PACKET_SIZE + KNET_HEADER_ALL_SIZE
#define KNET_DATABUFSIZE_CRYPT_PAD 1024
#define KNET_DATABUFSIZE_CRYPT KNET_DATABUFSIZE + KNET_DATABUFSIZE_CRYPT_PAD
#define KNET_DATABUFSIZE_COMPRESS_PAD 1024
#define KNET_DATABUFSIZE_COMPRESS KNET_DATABUFSIZE + KNET_DATABUFSIZE_COMPRESS_PAD
#define KNET_RING_RCVBUFF 8388608
#define PCKT_FRAG_MAX UINT8_MAX
#define PCKT_RX_BUFS 512
#define KNET_EPOLL_MAX_EVENTS KNET_DATAFD_MAX + 1
/*
* Size of threads stack. Value is choosen by experimenting, how much is needed
* to sucesfully finish test suite, and at the time of writing patch it was
* ~300KiB. To have some room for future enhancement it is increased
* by factor of 3 and rounded.
*/
#define KNET_THREAD_STACK_SIZE (1024 * 1024)
typedef void *knet_transport_link_t; /* per link transport handle */
typedef void *knet_transport_t; /* per knet_h transport handle */
struct knet_transport_ops; /* Forward because of circular dependancy */
struct knet_mmsghdr {
struct msghdr msg_hdr; /* Message header */
unsigned int msg_len; /* Number of bytes transmitted */
};
struct knet_link {
/* required */
struct sockaddr_storage src_addr;
struct sockaddr_storage dst_addr;
/* configurable */
unsigned int dynamic; /* see KNET_LINK_DYN_ define above */
uint8_t priority; /* higher priority == preferred for A/P */
unsigned long long ping_interval; /* interval */
unsigned long long pong_timeout; /* timeout */
unsigned long long pong_timeout_adj; /* timeout adjusted for latency */
uint8_t pong_timeout_backoff; /* see link.h for definition */
unsigned int latency_max_samples; /* precision */
unsigned int latency_cur_samples;
uint8_t pong_count; /* how many ping/pong to send/receive before link is up */
uint64_t flags;
void *access_list_match_entry_head; /* pointer to access list match_entry list head */
/* status */
struct knet_link_status status;
/* internals */
pthread_mutex_t link_stats_mutex; /* used to update link stats */
uint8_t link_id;
uint8_t transport; /* #defined constant from API */
knet_transport_link_t transport_link; /* link_info_t from transport */
int outsock;
unsigned int configured:1; /* set to 1 if src/dst have been configured transport initialized on this link*/
unsigned int transport_connected:1; /* set to 1 if lower level transport is connected */
uint8_t received_pong;
struct timespec ping_last;
/* used by PMTUD thread as temp per-link variables and should always contain the onwire_len value! */
uint32_t proto_overhead; /* IP + UDP/SCTP overhead. NOT to be confused
with stats.proto_overhead that includes also knet headers
and crypto headers */
struct timespec pmtud_last;
uint32_t last_ping_size;
uint32_t last_good_mtu;
uint32_t last_bad_mtu;
uint32_t last_sent_mtu;
uint32_t last_recv_mtu;
uint32_t pmtud_crypto_timeout_multiplier;/* used by PMTUd to adjust timeouts on high loads */
uint8_t has_valid_mtu;
};
#define KNET_DEFRAG_BUFFERS 32
#define KNET_CBUFFER_SIZE 4096
struct knet_host_defrag_buf {
char buf[KNET_DATABUFSIZE];
uint8_t in_use; /* 0 buffer is free, 1 is in use */
seq_num_t pckt_seq; /* identify the pckt we are receiving */
uint8_t frag_recv; /* how many frags did we receive */
uint8_t frag_map[PCKT_FRAG_MAX];/* bitmap of what we received? */
uint8_t last_first; /* special case if we receive the last fragment first */
ssize_t frag_size; /* normal frag size (not the last one) */
ssize_t last_frag_size; /* the last fragment might not be aligned with MTU size */
struct timespec last_update; /* keep time of the last pckt */
};
struct knet_host {
/* required */
knet_node_id_t host_id;
/* configurable */
uint8_t link_handler_policy;
char name[KNET_MAX_HOST_LEN];
/* status */
struct knet_host_status status;
/* internals */
char circular_buffer[KNET_CBUFFER_SIZE];
seq_num_t rx_seq_num;
seq_num_t untimed_rx_seq_num;
seq_num_t timed_rx_seq_num;
uint8_t got_data;
/* defrag/reassembly buffers */
struct knet_host_defrag_buf defrag_buf[KNET_DEFRAG_BUFFERS];
char circular_buffer_defrag[KNET_CBUFFER_SIZE];
/* link stuff */
struct knet_link link[KNET_MAX_LINK];
uint8_t active_link_entries;
uint8_t active_links[KNET_MAX_LINK];
struct knet_host *next;
};
struct knet_sock {
int sockfd[2]; /* sockfd[0] will always be application facing
* and sockfd[1] internal if sockpair has been created by knet */
int is_socket; /* check if it's a socket for recvmmsg usage */
int is_created; /* knet created this socket and has to clean up on exit/del */
int in_use; /* set to 1 if it's use, 0 if free */
int has_error; /* set to 1 if there were errors reading from the sock
* and socket has been removed from epoll */
};
struct knet_fd_trackers {
uint8_t transport; /* transport type (UDP/SCTP...) */
uint8_t data_type; /* internal use for transport to define what data are associated
* with this fd */
socklen_t sockaddr_len; /* Size of sockaddr_in[6] structure for this socket */
void *data; /* pointer to the data */
int ifindex; /* interface index for this bound address */
};
#define KNET_MAX_FDS KNET_MAX_HOST * KNET_MAX_LINK * 4
#define KNET_MAX_COMPRESS_METHODS UINT8_MAX
#define KNET_MAX_CRYPTO_INSTANCES 2
struct knet_handle_stats_extra {
uint64_t tx_crypt_pmtu_packets;
uint64_t tx_crypt_pmtu_reply_packets;
uint64_t tx_crypt_ping_packets;
uint64_t tx_crypt_pong_packets;
};
struct knet_handle {
knet_node_id_t host_id;
unsigned int enabled:1;
struct knet_sock sockfd[KNET_DATAFD_MAX + 1];
int logfd;
uint8_t log_levels[KNET_MAX_SUBSYSTEMS];
int dstsockfd[2];
int send_to_links_epollfd;
int recv_from_links_epollfd;
int dst_link_handler_epollfd;
uint8_t use_access_lists; /* set to 0 for disable, 1 for enable */
unsigned int pmtud_interval;
unsigned int manual_mtu;
unsigned int data_mtu; /* contains the max data size that we can send onwire
* without frags */
uint8_t prio_dscp; /* use this dscp value for KNET_LINK_FLAG_TRAFFICHIPRIO */
struct knet_host *host_head;
struct knet_host *host_index[KNET_MAX_HOST];
knet_transport_t transports[KNET_MAX_TRANSPORTS+1];
struct knet_fd_trackers knet_transport_fd_tracker[KNET_MAX_FDS]; /* track status for each fd handled by transports */
struct knet_handle_stats stats;
struct knet_handle_stats_extra stats_extra;
pthread_mutex_t handle_stats_mutex; /* used to protect handle stats */
uint32_t reconnect_int;
knet_node_id_t host_ids[KNET_MAX_HOST];
size_t host_ids_entries;
struct knet_header *recv_from_sock_buf;
struct knet_header *send_to_links_buf[PCKT_FRAG_MAX];
struct knet_header *recv_from_links_buf[PCKT_RX_BUFS];
struct knet_header *pingbuf;
struct knet_header *pmtudbuf;
uint8_t threads_status[KNET_THREAD_MAX];
uint8_t threads_flush_queue[KNET_THREAD_MAX];
pthread_mutex_t threads_status_mutex;
pthread_t send_to_links_thread;
pthread_t recv_from_links_thread;
pthread_t heartbt_thread;
pthread_t dst_link_handler_thread;
pthread_t pmtud_link_handler_thread;
pthread_rwlock_t global_rwlock; /* global config lock */
pthread_mutex_t pmtud_mutex; /* pmtud mutex to handle conditional send/recv + timeout */
pthread_cond_t pmtud_cond; /* conditional for above */
pthread_mutex_t tx_mutex; /* used to protect knet_send_sync and TX thread */
pthread_mutex_t hb_mutex; /* used to protect heartbeat thread and seq_num broadcasting */
pthread_mutex_t backoff_mutex; /* used to protect dst_link->pong_timeout_adj */
pthread_mutex_t kmtu_mutex; /* used to protect kernel_mtu */
uint32_t kernel_mtu; /* contains the MTU detected by the kernel on a given link */
int pmtud_waiting;
int pmtud_running;
int pmtud_forcerun;
int pmtud_abort;
struct crypto_instance *crypto_instance[KNET_MAX_CRYPTO_INSTANCES + 1]; /* store an extra pointer to allow 0|1|2 values without too much magic in the code */
uint8_t crypto_in_use_config; /* crypto config to use for TX */
uint8_t crypto_only; /* allow only crypto (1) or also clear (0) traffic */
size_t sec_block_size;
size_t sec_hash_size;
size_t sec_salt_size;
unsigned char *send_to_links_buf_crypt[PCKT_FRAG_MAX];
unsigned char *recv_from_links_buf_crypt;
unsigned char *recv_from_links_buf_decrypt;
unsigned char *pingbuf_crypt;
unsigned char *pmtudbuf_crypt;
int compress_model;
int compress_level;
size_t compress_threshold;
void *compress_int_data[KNET_MAX_COMPRESS_METHODS]; /* for compress method private data */
unsigned char *recv_from_links_buf_decompress;
unsigned char *send_to_links_buf_compress;
seq_num_t tx_seq_num;
pthread_mutex_t tx_seq_num_mutex;
uint8_t has_loop_link;
uint8_t loop_link;
void *dst_host_filter_fn_private_data;
int (*dst_host_filter_fn) (
void *private_data,
const unsigned char *outdata,
ssize_t outdata_len,
uint8_t tx_rx,
knet_node_id_t this_host_id,
knet_node_id_t src_node_id,
int8_t *channel,
knet_node_id_t *dst_host_ids,
size_t *dst_host_ids_entries);
void *pmtud_notify_fn_private_data;
void (*pmtud_notify_fn) (
void *private_data,
unsigned int data_mtu);
void *host_status_change_notify_fn_private_data;
void (*host_status_change_notify_fn) (
void *private_data,
knet_node_id_t host_id,
uint8_t reachable,
uint8_t remote,
uint8_t external);
void *sock_notify_fn_private_data;
void (*sock_notify_fn) (
void *private_data,
int datafd,
int8_t channel,
uint8_t tx_rx,
int error,
int errorno);
int fini_in_progress;
uint64_t flags;
struct qb_list_head list;
const char *plugin_path;
};
struct handle_tracker {
struct qb_list_head head;
};
/*
* lib_config stuff shared across everything
*/
extern pthread_rwlock_t shlib_rwlock; /* global shared lib load lock */
extern pthread_mutex_t handle_config_mutex;
extern struct handle_tracker handle_list;
extern uint8_t handle_list_init;
int _is_valid_handle(knet_handle_t knet_h);
int _init_shlib_tracker(knet_handle_t knet_h);
void _fini_shlib_tracker(void);
/*
* NOTE: every single operation must be implementend
* for every protocol.
*/
/*
* for now knet supports only IP protocols (udp/sctp)
* in future there might be others like ARP
* or TIPC.
* keep this around as transport information
* to use for access lists and other operations
*/
#define TRANSPORT_PROTO_LOOPBACK 0
#define TRANSPORT_PROTO_IP_PROTO 1
/*
* some transports like SCTP can filter incoming
* connections before knet has to process
* any packets.
* GENERIC_ACL -> packet has to be read and filterted
* PROTO_ACL -> transport provides filtering at lower levels
* and packet does not need to be processed
*/
typedef enum {
USE_NO_ACL,
USE_GENERIC_ACL,
USE_PROTO_ACL
} transport_acl;
/*
* make it easier to map values in transports.c
*/
#define TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED 0
#define TRANSPORT_PROTO_IS_CONNECTION_ORIENTED 1
typedef struct knet_transport_ops {
/*
* transport generic information
*/
const char *transport_name;
const uint8_t transport_id;
const uint8_t built_in;
uint8_t transport_protocol;
transport_acl transport_acl_type;
/*
* connection oriented protocols like SCTP
* don“t need dst_addr in sendto calls and
* on some OSes are considered EINVAL.
*/
uint8_t transport_is_connection_oriented;
uint32_t transport_mtu_overhead;
/*
* transport init must allocate the new transport
* and perform all internal initializations
* (threads, lists, etc).
*/
int (*transport_init)(knet_handle_t knet_h);
/*
* transport free must releases _all_ resources
* allocated by tranport_init
*/
int (*transport_free)(knet_handle_t knet_h);
/*
* link operations should take care of all the
* sockets and epoll management for a given link/transport set
* transport_link_disable should return err = -1 and errno = EBUSY
* if listener is still in use, and any other errno in case
* the link cannot be disabled.
*
* set_config/clear_config are invoked in global write lock context
*/
int (*transport_link_set_config)(knet_handle_t knet_h, struct knet_link *link);
int (*transport_link_clear_config)(knet_handle_t knet_h, struct knet_link *link);
/*
* transport callback for incoming dynamic connections
* this is called in global read lock context
*/
int (*transport_link_dyn_connect)(knet_handle_t knet_h, int sockfd, struct knet_link *link);
/*
* per transport error handling of recvmmsg
* (see _handle_recv_from_links comments for details)
*/
/*
* transport_rx_sock_error is invoked when recvmmsg returns <= 0
*
* transport_rx_sock_error is invoked with both global_rdlock
*/
int (*transport_rx_sock_error)(knet_handle_t knet_h, int sockfd, int recv_err, int recv_errno);
/*
* transport_tx_sock_error is invoked with global_rwlock and
* it's invoked when sendto or sendmmsg returns =< 0
*
* it should return:
* -1 on internal error
* 0 ignore error and continue
* 1 retry
* any sleep or wait action should happen inside the transport code
*/
int (*transport_tx_sock_error)(knet_handle_t knet_h, int sockfd, int subsys, int recv_err, int recv_errno);
/*
* this function is called on _every_ received packet
* to verify if the packet is data or internal protocol error handling
*
* it should return:
* -1 on error
* 0 packet is not data and we should continue the packet process loop
* 1 packet is not data and we should STOP the packet process loop
* 2 packet is data and should be parsed as such
*
* transport_rx_is_data is invoked with both global_rwlock
* and fd_tracker read lock (from RX thread)
*/
int (*transport_rx_is_data)(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg);
/*
* this function is called by links.c when a link down event is recorded
* to notify the transport that packets are not going through, and give
* transport the opportunity to take actions.
*/
int (*transport_link_is_down)(knet_handle_t knet_h, struct knet_link *link);
} knet_transport_ops_t;
struct pretty_names {
const char *name;
uint8_t val;
};
#endif
|