1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
|
#ifndef _MEDIA_SOCKET_H_
#define _MEDIA_SOCKET_H_
#include <glib.h>
#include <string.h>
#include <stdio.h>
#include "str.h"
#include "obj.h"
#include "helpers.h"
#include "dtls.h"
#include "crypto.h"
#include "socket.h"
#include "containers.h"
#include "types.h"
#include "xt_RTPENGINE.h"
#include "common_stats.h"
struct media_packet;
struct transport_protocol;
struct ssrc_entry_call;
struct rtpengine_srtp;
struct jb_packet;
struct poller;
struct media_player_cache_entry;
TYPED_GQUEUE(stream_fd, stream_fd)
typedef int rtcp_filter_func(struct media_packet *, GQueue *);
typedef int (*rewrite_func)(str *, struct packet_stream *, struct ssrc_entry_call *);
enum transport_protocol_index {
PROTO_RTP_AVP = 0,
PROTO_RTP_SAVP,
PROTO_RTP_AVPF,
PROTO_RTP_SAVPF,
PROTO_UDP_TLS_RTP_SAVP,
PROTO_UDP_TLS_RTP_SAVPF,
PROTO_UDPTL,
PROTO_RTP_SAVP_OSRTP,
PROTO_RTP_SAVPF_OSRTP,
PROTO_UNKNOWN,
__PROTO_LAST,
};
struct transport_protocol {
enum transport_protocol_index index;
const char *name;
enum transport_protocol_index avpf_proto;
enum transport_protocol_index osrtp_proto;
enum transport_protocol_index rtp_proto;
unsigned int rtp:1; /* also set to 1 for SRTP */
unsigned int srtp:1;
unsigned int osrtp:1;
unsigned int avpf:1;
unsigned int tcp:1;
};
extern const struct transport_protocol transport_protocols[];
struct streamhandler_io {
rewrite_func rtp_crypt;
rewrite_func rtcp_crypt;
rtcp_filter_func *rtcp_filter;
int (*kernel)(struct rtpengine_srtp *, struct packet_stream *);
};
struct streamhandler {
const struct streamhandler_io *in;
const struct streamhandler_io *out;
};
TYPED_GQUEUE(local_intf, struct local_intf)
TYPED_GHASHTABLE(rr_specs_ht, str, struct logical_intf, str_hash, str_equal, NULL, NULL)
struct logical_intf {
str name;
sockfamily_t *preferred_family;
local_intf_q list;
rr_specs_ht rr_specs;
str name_base; // if name is "foo:bar", this is "foo"
};
typedef void port_t;
TYPED_GQUEUE(ports, port_t)
struct socket_port_link {
socket_t socket;
ports_q links;
struct port_pool *pp;
};
TYPED_GQUEUE(port_pool, struct port_pool)
struct port_pool {
unsigned int min, max;
mutex_t free_list_lock;
ports_q free_ports_q; /* for getting the next free port */
ports_list **free_ports; /* for a lookup if the port is used */
port_pool_q overlaps;
};
#define free_ports_link(pp, port) ((pp)->free_ports[port - (pp)->min])
struct intf_address {
socktype_t *type;
sockaddr_t addr;
};
struct intf_config {
str name; // full name (before the '/' separator in config)
str name_base; // if name is "foo:bar", this is "foo"
str name_rr_spec; // if name is "foo:bar", this is "bar"
str alias; // if interface is "foo=bar", this is "bar"
struct intf_address local_address;
struct intf_address advertised_address;
unsigned int port_min, port_max;
GList *exclude_ports;
};
struct intf_spec {
struct intf_address local_address;
struct port_pool port_pool;
};
struct interface_sampled_rate_stats {
GHashTable *ht;
struct interface_stats_block intv;
};
INLINE void interface_sampled_calc_diff(const struct interface_sampled_stats *stats,
struct interface_sampled_stats *intv, struct interface_sampled_stats *diff)
{
#define F(x) STAT_SAMPLED_CALC_DIFF(x, stats, intv, diff)
#include "interface_sampled_stats_fields.inc"
#undef F
}
INLINE void interface_sampled_avg(struct interface_sampled_stats_avg *loc,
const struct interface_sampled_stats *diff) {
#define F(x) STAT_SAMPLED_AVG_STDDEV(x, loc, diff)
#include "interface_sampled_stats_fields.inc"
#undef F
}
INLINE void interface_counter_calc_diff(const struct interface_counter_stats *stats,
struct interface_counter_stats *intv, struct interface_counter_stats *diff) {
#define F(x) atomic64_calc_diff(&stats->x, &intv->x, &diff->x);
#include "interface_counter_stats_fields.inc"
#undef F
}
INLINE void interface_counter_calc_diff_dir(const struct interface_counter_stats_dir *stats,
struct interface_counter_stats_dir *intv, struct interface_counter_stats_dir *diff) {
#define F(x) atomic64_calc_diff(&stats->x, &intv->x, &diff->x);
#include "interface_counter_stats_fields_dir.inc"
#undef F
}
INLINE void interface_counter_calc_rate_from_diff(int64_t run_diff_us,
struct interface_counter_stats *diff, struct interface_counter_stats *rate) {
#define F(x) atomic64_calc_rate_from_diff(run_diff_us, atomic64_get(&diff->x), &rate->x);
#include "interface_counter_stats_fields.inc"
#undef F
}
INLINE void interface_counter_calc_rate_from_diff_dir(int64_t run_diff_us,
struct interface_counter_stats_dir *diff, struct interface_counter_stats_dir *rate) {
#define F(x) atomic64_calc_rate_from_diff(run_diff_us, atomic64_get(&diff->x), &rate->x);
#include "interface_counter_stats_fields_dir.inc"
#undef F
}
void interface_sampled_rate_stats_init(struct interface_sampled_rate_stats *);
void interface_sampled_rate_stats_destroy(struct interface_sampled_rate_stats *);
struct interface_stats_block *interface_sampled_rate_stats_get(struct interface_sampled_rate_stats *s,
struct local_intf *lif, int64_t *time_diff_us);
TYPED_GQUEUE(socket_port, struct socket_port_link)
struct local_intf {
struct intf_spec *spec;
struct intf_address advertised_address;
unsigned int unique_id; /* starting with 0 - serves as preference */
struct logical_intf *logical;
str ice_foundation;
struct interface_stats_block *stats;
};
struct socket_intf_list {
struct local_intf *local_intf;
socket_port_q list;
};
struct sfd_intf_list {
struct local_intf *local_intf;
stream_fd_q list;
};
TYPED_GQUEUE(socket_intf_list, struct socket_intf_list) /* RO */
TYPED_GQUEUE(sfd_intf_list, struct sfd_intf_list)
/**
* stream_fd is an entry-point object for RTP packets handling,
* because of that it's also reference-counted.
*
* stream_fd object us only released, when it is removed from the poller
* and also removed from the call object.
*
* Contains an information required for media processing, such as media ports.
*/
struct stream_fd {
/* struct obj member must always be the first member in a struct.
*
* obj is created with a cleanup handler, see obj_alloc(),
* and this handler is executed whenever the reference count drops to zero.
*
* References are acquired and released through obj_get() and obj_put()
* (plus some other wrapper functions).
*/
struct obj obj;
unsigned int unique_id; /* RO */
union {
socket_t socket; /* RO - alias */
struct socket_port_link spl; /* RO */
};
struct local_intf *local_intf; /* RO */
/* stream_fd object holds a reference to the call it belongs to.
* Which in turn holds references to all stream_fd objects it contains,
* what makes these references circular.
*
* The call is only released when it has been dissociated from all stream_fd objects,
* which happens during call teardown.
*/
call_t *call; /* RO */
struct packet_stream *stream; /* LOCK: call->master_lock */
struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */
struct dtls_connection dtls; /* LOCK: stream->in_lock */
int error_strikes;
int active_read_events;
struct poller *poller;
};
struct sink_attrs {
// cannot be bit fields because G_STRUCT_OFFSET is used on them
bool block_media;
bool silence_media;
bool offer_answer:1; // bidirectional, exclusive
bool rtcp_only:1;
bool transcoding:1;
bool egress:1;
};
/**
* During actual packet handling and forwarding,
* only the sink_handler objects (and the packet_stream objects they are related to) are used.
*/
struct sink_handler {
struct packet_stream *sink;
const struct streamhandler *handler;
int kernel_output_idx;
struct sink_attrs attrs;
};
struct media_packet {
str raw;
endpoint_t fsin; // source address of received packet
int64_t tv; // timestamp when packet was received
stream_fd *sfd; // fd which received the packet
call_t *call; // sfd->call
struct packet_stream *stream; // sfd->stream
struct call_media *media; // stream->media
struct call_media *media_out; // output media
struct sink_handler sink;
struct media_player_cache_entry *cache_entry;
struct rtp_header *rtp;
struct rtcp_packet *rtcp;
struct ssrc_entry_call *ssrc_in, *ssrc_out; // SSRC contexts from in_srtp and out_srtp
str payload;
codec_packet_q packets_out;
int ptime; // returned from decoding
};
extern local_intf_q all_local_interfaces; // read-only during runtime
extern __thread struct bufferpool *media_bufferpool;
void interfaces_init(intf_config_q *interfaces);
void interfaces_free(void);
struct logical_intf *get_logical_interface(const str *name, sockfamily_t *fam, int num_ports);
struct local_intf *get_interface_address(const struct logical_intf *lif, sockfamily_t *fam);
struct local_intf *get_any_interface_address(const struct logical_intf *lif, sockfamily_t *fam);
void interfaces_exclude_port(endpoint_t *);
bool is_local_endpoint(const struct intf_address *addr, unsigned int port);
struct socket_port_link get_specific_port(unsigned int port,
struct intf_spec *spec, const str *label);
bool get_consecutive_ports(socket_intf_list_q *out, unsigned int num_ports, unsigned int num_intfs,
struct call_media *media);
stream_fd *stream_fd_new(struct socket_port_link *, call_t *call, struct local_intf *lif);
stream_fd *stream_fd_lookup(const endpoint_t *);
void stream_fd_release(stream_fd *);
enum thread_looper_action release_closed_sockets(void);
void append_thread_lpr_to_glob_lpr(void);
void free_sfd_intf_list(struct sfd_intf_list *il);
void free_release_sfd_intf_list(struct sfd_intf_list *il);
void free_socket_intf_list(struct socket_intf_list *il);
void __unkernelize(struct packet_stream *, const char *);
void unkernelize(struct packet_stream *, const char *);
void __stream_unconfirm(struct packet_stream *, const char *);
void __reset_sink_handlers(struct packet_stream *);
int __hunt_ssrc_ctx_idx(uint32_t ssrc, struct ssrc_entry_call *list[RTPE_NUM_SSRC_TRACKING],
unsigned int start_idx);
struct ssrc_entry_call *__hunt_ssrc_ctx(uint32_t ssrc, struct ssrc_entry_call *list[RTPE_NUM_SSRC_TRACKING],
unsigned int start_idx);
void media_packet_copy(struct media_packet *, const struct media_packet *);
void media_packet_release(struct media_packet *);
int media_socket_dequeue(struct media_packet *mp, struct packet_stream *sink);
const struct streamhandler *determine_handler(const struct transport_protocol *in_proto,
struct call_media *out_media, bool must_recrypt);
int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, struct media_packet *mp);
const struct transport_protocol *transport_protocol(const str *s);
//void play_buffered(struct packet_stream *sink, struct codec_packet *cp, int buffered);
void play_buffered(struct jb_packet *cp);
INLINE int proto_is_rtp(const struct transport_protocol *protocol) {
// known to be RTP? therefore unknown is not RTP
if (!protocol)
return 0;
return protocol->rtp ? 1 : 0;
}
INLINE int proto_is_not_rtp(const struct transport_protocol *protocol) {
// known not to be RTP? therefore unknown might be RTP
if (!protocol)
return 0;
return protocol->rtp ? 0 : 1;
}
INLINE int proto_is(const struct transport_protocol *protocol, enum transport_protocol_index idx) {
if (!protocol)
return 0;
return (protocol->index == idx) ? 1 : 0;
}
INLINE void stream_fd_put(stream_fd *sp) {
if (!sp)
return;
obj_put(sp);
}
G_DEFINE_AUTOPTR_CLEANUP_FUNC(stream_fd, stream_fd_put)
#endif
|