1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
/*
*
* Stream PIPE output - Stream a program to a FIFO for postprocessing
*
* One of the problems i could imaging is that for a program which does not get
* and traffic e.g. TS Packets we'll never discover that the reader closed the pipe
* as we never issue a write. This shouldnt be a problem but you are warned. Flo 2007-04-19
*
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
#include <signal.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include "output.h"
#include "simplebuffer.h"
#include "getstream.h"
#define PIPE_BUFFER 32768
#define PIPE_INTERVAL 5
#define PIPE_MAXEAGAIN 10
static int output_pipe_tryopen(struct output_s *o) {
o->pipe.fd=open(o->pipe.filename, O_NONBLOCK|O_WRONLY);
if (o->pipe.fd >= 0) {
logwrite(LOG_INFO, "stream_pipe: starting to write to %s - got reader", o->pipe.filename);
o->receiver++;
sb_zap(o->buffer);
return 1;
}
if (errno == ENXIO)
return 0;
logwrite(LOG_ERROR, "stream_pipe: failed to open fifo %s", o->pipe.filename);
return 0;
}
static void output_pipe_open_event(int fd, short event, void *arg);
static void output_pipe_event_init(struct output_s *o) {
struct timeval tv;
tv.tv_usec=0;
tv.tv_sec=PIPE_INTERVAL;
evtimer_set(&o->pipe.event, output_pipe_open_event, o);
evtimer_add(&o->pipe.event, &tv);
}
static void output_pipe_open_event(int fd, short event, void *arg) {
struct output_s *o=arg;
/* Try opening - if it fails - retry */
if (!output_pipe_tryopen(o))
output_pipe_event_init(o);
}
static void output_pipe_close(struct output_s *o) {
o->receiver--;
close(o->pipe.fd);
logwrite(LOG_INFO, "stream_pipe: closing %s - reader exited", o->pipe.filename);
/* PIPE is closed - try opening on regular interval */
output_pipe_event_init(o);
}
int output_init_pipe(struct output_s *o) {
struct stat st;
if (!lstat(o->pipe.filename, &st)) {
if (!S_ISFIFO(st.st_mode)) {
logwrite(LOG_ERROR, "stream_pipe: %s exists and is not a pipe", o->pipe.filename);
return 0;
}
} else {
/* if lstat fails we possibly have no fifo */
/* FIXME we need to check errno for non existant */
if (mknod(o->pipe.filename, S_IFIFO|0700, 0)) {
logwrite(LOG_ERROR, "stream_pipe: failed to create fifo %s", o->pipe.filename);
return 0;
}
}
o->buffer=sb_init(PIPE_BUFFER, 1, 0);
if (!o->buffer)
return 0;
signal(SIGPIPE, SIG_IGN);
output_pipe_event_init(o);
return 1;
}
void output_send_pipe(struct output_s *o, uint8_t *tsp) {
int len;
if (sb_free_atoms(o->buffer) < TS_PACKET_SIZE) {
logwrite(LOG_ERROR, "stream_pipe: buffer overflow - dropping");
output_pipe_close(o);
return;
}
sb_add_atoms(o->buffer, tsp, TS_PACKET_SIZE);
len=write(o->pipe.fd, sb_bufptr(o->buffer), sb_buflen(o->buffer));
/*
* We zap the buffer if we succeeded writing or not - there is no point
* in keeping the data - If the reader aint fast enough there is no point
* in buffering.
*/
if (len < 0) {
if (errno == EAGAIN) {
return;
}
logwrite(LOG_ERROR, "stream_pipe: write returned %d / %s", errno, strerror(errno));
output_pipe_close(o);
return;
}
sb_drop_atoms(o->buffer, len);
/* FIXME - We might want to do more graceful here. We tried
* writing multiple TS packets to the FIFO and it failed. We
* now discard ALL packets in our buffer so we might loose some.
* A more graceful way would be to retry writing - For this we
* might need a different buffer design e.g. a ringbuffr
*
* Use libevent to get a callback when the reader is ready ?
*
*/
}
|