1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
|
#include "stream.h"
#include <glib.h>
#include <pthread.h>
#include <unistd.h>
#include <limits.h>
#include <fcntl.h>
#include <libavcodec/avcodec.h>
#include "metafile.h"
#include "epoll.h"
#include "log.h"
#include "main.h"
#include "packet.h"
#include "forward.h"
#include "recaux.h"
#define MAXBUFLEN 65535
#ifndef AV_INPUT_BUFFER_PADDING_SIZE
#define AV_INPUT_BUFFER_PADDING_SIZE 0
#endif
#ifndef FF_INPUT_BUFFER_PADDING_SIZE
#define FF_INPUT_BUFFER_PADDING_SIZE 0
#endif
#define ALLOCLEN (MAXBUFLEN + AV_INPUT_BUFFER_PADDING_SIZE + FF_INPUT_BUFFER_PADDING_SIZE)
// stream is locked
void stream_close(stream_t *stream) {
if (stream->fd == -1)
return;
epoll_del(stream->fd);
close(stream->fd);
stream->fd = -1;
}
void stream_free(stream_t *stream) {
g_free(stream);
}
static void stream_handler(handler_t *handler) {
stream_t *stream = handler->ptr;
unsigned char *buf = NULL;
log_info_call = stream->metafile->name;
log_info_stream = stream->name;
//dbg("poll event for %s", stream->name);
while (true) {
pthread_mutex_lock(&stream->lock);
if (stream->fd == -1)
break;
buf = malloc(ALLOCLEN);
int ret = read(stream->fd, buf, MAXBUFLEN);
if (ret == 0) {
ilog(LOG_INFO, "EOF on stream %s", stream->name);
stream_close(stream);
break;
}
else if (ret < 0) {
if (errno == EAGAIN || errno == EINTR || errno == EWOULDBLOCK)
break;
ilog(LOG_INFO, "Read error on stream %s: %s", stream->name, strerror(errno));
stream_close(stream);
break;
}
// got a packet
pthread_mutex_unlock(&stream->lock);
if (forward_to){
if (forward_packet(stream->metafile,buf,ret)) // leaves buf intact
__atomic_add_fetch(&stream->metafile->forward_failed, 1, __ATOMIC_RELAXED);
else
__atomic_add_fetch(&stream->metafile->forward_count, 1, __ATOMIC_RELAXED);
}
if (decoding_enabled)
packet_process(stream, buf, ret); // consumes buf
else
free(buf);
buf = NULL;
}
pthread_mutex_unlock(&stream->lock);
if (buf)
free(buf);
log_info_call = NULL;
log_info_stream = NULL;
}
// mf is locked
static stream_t *stream_get(metafile_t *mf, unsigned long id) {
if (mf->streams->len <= id)
g_ptr_array_set_size(mf->streams, id + 1);
stream_t *ret = g_ptr_array_index(mf->streams, id);
if (ret)
goto out;
ret = g_new(stream_t, 1);
g_ptr_array_index(mf->streams, id) = ret;
pthread_mutex_init(&ret->lock, NULL);
ret->fd = -1;
ret->id = id;
ret->metafile = mf;
ret->tag = (unsigned long) -1;
ret->start_time_us = now_us();
out:
return ret;
}
// mf is locked
void stream_open(metafile_t *mf, unsigned long id, char *name) {
dbg("opening stream %lu/%s", id, name);
stream_t *stream = stream_get(mf, id);
stream->name = g_string_chunk_insert(mf->gsc, name);
char fnbuf[PATH_MAX];
snprintf(fnbuf, sizeof(fnbuf), "/proc/rtpengine/%u/calls/%s/%s", ktable, mf->parent, name);
stream->fd = open(fnbuf, O_RDONLY | O_NONBLOCK);
if (stream->fd == -1) {
ilog(LOG_ERR, "Failed to open kernel stream %s: %s", fnbuf, strerror(errno));
return;
}
// add to epoll
stream->handler.ptr = stream;
stream->handler.func = stream_handler;
epoll_add(stream->fd, EPOLLIN, &stream->handler);
}
void stream_details(metafile_t *mf, unsigned long id, unsigned int tag, unsigned int media_sdp_id,
unsigned int channel_slot)
{
stream_t *stream = stream_get(mf, id);
stream->tag = tag;
stream->media_sdp_id = media_sdp_id;
if (channel_slot >= mix_num_inputs) {
stream->channel_slot = channel_slot % mix_num_inputs;
ilog(LOG_ERR, "Channel slot %u is greater than the maximum number of inputs %u, setting to %u",
channel_slot, mix_num_inputs, stream->channel_slot);
}
else
stream->channel_slot = channel_slot;
}
void stream_forwarding_on(metafile_t *mf, unsigned long id, unsigned int on) {
stream_t *stream = stream_get(mf, id);
dbg("Setting forwarding flag to %u for stream #%lu", on, stream->id);
stream->forwarding_on = on ? 1 : 0;
}
|