File: stream_pipe.c

package info (click to toggle)
getstream 20070419-1
  • links: PTS
  • area: main
  • in suites: lenny
  • size: 308 kB
  • ctags: 658
  • sloc: ansic: 4,137; makefile: 58
file content (135 lines) | stat: -rw-r--r-- 3,226 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*
 *
 * Stream PIPE output - Stream a program to a FIFO for postprocessing
 *
 * One of the problems i could imaging is that for a program which does not get
 * and traffic e.g. TS Packets we'll never discover that the reader closed the pipe
 * as we never issue a write. This shouldnt be a problem but you are warned. Flo 2007-04-19
 *
 */
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
#include <signal.h>

#include <errno.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>

#include "getstream.h"

#define PIPE_MAX_PAYLOAD		2048
#define PIPE_INTERVAL			5

static int stream_pipe_tryopen(struct stream_s *s) {

	s->pipefd=open(s->filename, O_NONBLOCK|O_WRONLY);

	if (s->pipefd >= 0) {
		logwrite(LOG_INFO, "stream_pipe: starting to write to %s - got reader", s->filename);
		s->receiver++;
		return 1;
	}

	if (errno == ENXIO)
		return 0;

	logwrite(LOG_ERROR, "stream_pipe: failed to open fifo %s", s->filename);

	return 0;
}

static void stream_pipe_open_event(int fd, short event, void *arg);

static void stream_pipe_event_init(struct stream_s *s) {
	static struct event	pevent;
	static struct timeval	tv;

	tv.tv_usec=0;
	tv.tv_sec=PIPE_INTERVAL;

	evtimer_set(&pevent, stream_pipe_open_event, s);
	evtimer_add(&pevent, &tv);
}

static void stream_pipe_open_event(int fd, short event, void *arg) {
	struct stream_s		*s=arg;

	/* Try opening - if it fails - retry */
	if (!stream_pipe_tryopen(s))
		stream_pipe_event_init(s);
}

static void stream_pipe_close(struct stream_s *s) {
	s->receiver--;
	close(s->pipefd);
	logwrite(LOG_INFO, "stream_pipe: closing %s - reader exited", s->filename);

	/* PIPE is closed - try opening on regular interval */
	stream_pipe_event_init(s);
}

int stream_init_pipe(struct stream_s *s) {
	struct stat		st;

	if (!lstat(s->filename, &st)) {
		if (!S_ISFIFO(st.st_mode)) {
			logwrite(LOG_ERROR, "stream_pipe: %s exists and is not a pipe", s->filename);
			return 0;
		}
	} else {
		/* if lstat fails we possibly have no fifo */
		/* FIXME we need to check errno for non existant */
		if (mknod(s->filename, S_IFIFO|0700, 0)) {
			logwrite(LOG_ERROR, "stream_pipe: failed to create fifo %s", s->filename);
			return 0;
		}
	}


	s->buffer=malloc(PIPE_MAX_PAYLOAD);

	if (!s->buffer)
		return 0;

	signal(SIGPIPE, SIG_IGN);

	stream_pipe_event_init(s);

	return 1;
}


void stream_send_pipe(struct stream_s *s, uint8_t *tsp) {
	int	len;

	/* Copy TS packet to packet buffer */
	memcpy(&s->buffer[s->buffervalid], tsp, TS_PACKET_SIZE);
	s->buffervalid+=TS_PACKET_SIZE;

	/* check whether another packet would fit ? */
	if (s->buffervalid + TS_PACKET_SIZE > PIPE_MAX_PAYLOAD) {
		/* Send packet and reset valid counter */
		len=write(s->pipefd, s->buffer, s->buffervalid);

		if (len < 0) {
			if (errno == EPIPE)
				stream_pipe_close(s);
			return;
		}

		/* FIXME - We might want to do more graceful here. We tried
		 * writing multiple TS packets to the FIFO and it failed. We
		 * now discard ALL packets in our buffer so we might loose some.
		 * A more graceful way would be to retry writing.
		 * Use libevent to get a callback when the reader is ready ?
		 */

		s->buffervalid=0;
	}
}