1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
/*
*
* Stream PIPE output - Stream a program to a FIFO for postprocessing
*
* One of the problems i could imaging is that for a program which does not get
* and traffic e.g. TS Packets we'll never discover that the reader closed the pipe
* as we never issue a write. This shouldnt be a problem but you are warned. Flo 2007-04-19
*
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
#include <signal.h>
#include <errno.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include "getstream.h"
#define PIPE_MAX_PAYLOAD 2048
#define PIPE_INTERVAL 5
static int stream_pipe_tryopen(struct stream_s *s) {
s->pipefd=open(s->filename, O_NONBLOCK|O_WRONLY);
if (s->pipefd >= 0) {
logwrite(LOG_INFO, "stream_pipe: starting to write to %s - got reader", s->filename);
s->receiver++;
return 1;
}
if (errno == ENXIO)
return 0;
logwrite(LOG_ERROR, "stream_pipe: failed to open fifo %s", s->filename);
return 0;
}
static void stream_pipe_open_event(int fd, short event, void *arg);
static void stream_pipe_event_init(struct stream_s *s) {
static struct event pevent;
static struct timeval tv;
tv.tv_usec=0;
tv.tv_sec=PIPE_INTERVAL;
evtimer_set(&pevent, stream_pipe_open_event, s);
evtimer_add(&pevent, &tv);
}
static void stream_pipe_open_event(int fd, short event, void *arg) {
struct stream_s *s=arg;
/* Try opening - if it fails - retry */
if (!stream_pipe_tryopen(s))
stream_pipe_event_init(s);
}
static void stream_pipe_close(struct stream_s *s) {
s->receiver--;
close(s->pipefd);
logwrite(LOG_INFO, "stream_pipe: closing %s - reader exited", s->filename);
/* PIPE is closed - try opening on regular interval */
stream_pipe_event_init(s);
}
int stream_init_pipe(struct stream_s *s) {
struct stat st;
if (!lstat(s->filename, &st)) {
if (!S_ISFIFO(st.st_mode)) {
logwrite(LOG_ERROR, "stream_pipe: %s exists and is not a pipe", s->filename);
return 0;
}
} else {
/* if lstat fails we possibly have no fifo */
/* FIXME we need to check errno for non existant */
if (mknod(s->filename, S_IFIFO|0700, 0)) {
logwrite(LOG_ERROR, "stream_pipe: failed to create fifo %s", s->filename);
return 0;
}
}
s->buffer=malloc(PIPE_MAX_PAYLOAD);
if (!s->buffer)
return 0;
signal(SIGPIPE, SIG_IGN);
stream_pipe_event_init(s);
return 1;
}
void stream_send_pipe(struct stream_s *s, uint8_t *tsp) {
int len;
/* Copy TS packet to packet buffer */
memcpy(&s->buffer[s->buffervalid], tsp, TS_PACKET_SIZE);
s->buffervalid+=TS_PACKET_SIZE;
/* check whether another packet would fit ? */
if (s->buffervalid + TS_PACKET_SIZE > PIPE_MAX_PAYLOAD) {
/* Send packet and reset valid counter */
len=write(s->pipefd, s->buffer, s->buffervalid);
if (len < 0) {
if (errno == EPIPE)
stream_pipe_close(s);
return;
}
/* FIXME - We might want to do more graceful here. We tried
* writing multiple TS packets to the FIFO and it failed. We
* now discard ALL packets in our buffer so we might loose some.
* A more graceful way would be to retry writing.
* Use libevent to get a callback when the reader is ready ?
*/
s->buffervalid=0;
}
}
|