File: com_stream.c

package info (click to toggle)
xblast-tnt 2.10.4-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,444 kB
  • sloc: ansic: 54,105; sh: 4,014; makefile: 129; sed: 16
file content (180 lines) | stat: -rw-r--r-- 5,778 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
/*
 * file com_stream.c - base struct und functions for stream connections
 *
 * $Id: com_stream.c,v 1.12 2006/02/09 21:21:23 fzago Exp $
 *
 * Program XBLAST
 * (C) by Oliver Vogel (e-mail: m.vogel@ndh.net)
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published
 * by the Free Software Foundation; either version 2; or (at your option)
 * any later version
 *
 * This program is distributed in the hope that it will be entertaining,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILTY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
 * Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.
 * 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */

#include "xblast.h"

/*
 * try to read telegrams from server
 */
static XBCommResult
ReadStream (XBComm * comm)
{
	XBTeleResult result;
	XBTelegram *tele;
	XBCommResult cResult;
	XBCommStream *stream = (XBCommStream *) comm;
	unsigned fd;
	assert (NULL != stream);
	/* get file descriptor for debug output */
	fd = Socket_Fd (stream->comm.socket);
	/* read data into queue */
	result = Net_Receive (stream->rcvQueue, stream->comm.socket);
	if (stream->prepFinish) {
		/* reading while waiting for eof */
		switch (result) {
		case XBT_R_EndOfFile:
			Dbg_Stream ("expected end of file on fd=%u\n", fd);
			break;
		case XBT_R_IOError:
			Dbg_Stream ("read error on fd=%u while waiting for eof\n", fd);
			break;
		default:
			Dbg_Stream ("successful read on fd=%u while waiting for eof\n", fd);
			break;
		}
		/* only stream remains to be removed, sends close to parent layer */
		Stream_CommFinish (stream);
		return XCR_OK;
	}
	else {
		/* active mode */
		switch (result) {
		case XBT_R_EndOfFile:
			Dbg_Stream ("unexpected end of file on fd=%u\n", fd);
			/* eof event to parent layer */
			(void)(*stream->eventFunc) (stream, XBST_EOF);
			return XCR_Finished;
		case XBT_R_IOError:
			Dbg_Stream ("read error on fd=%u\n", fd);
			/* state change to parent layer */
			assert (stream->eventFunc != NULL);
			(void)(*stream->eventFunc) (stream, XBST_IOREAD);
			return XCR_Error;
		default:
			Dbg_Stream ("successful read on fd=%u\n", fd);
		}
	}
	/* data in rcv queue, handle as much as possible */
	assert (stream->handleFunc != NULL);
	while (NULL != (tele = Net_ReceiveTelegram (stream->rcvQueue))) {
		/* handle a single message */
		cResult = (*stream->handleFunc) (stream, tele);
		/* message not needed anymore */
		Net_DeleteTelegram (tele);
		/* return if handling fails, otherwise continue */
		if (cResult != XCR_OK) {
			Dbg_Stream ("parse error on fd=%u, shutting down\n", fd);
			return cResult;
		}
	}
	return XCR_OK;
}								/* ReadStream */

/*
 * XBComm write handler for XBCommStream
 */
static XBCommResult
WriteStream (XBComm * comm)
{
	XBTeleResult result;
	XBCommStream *stream = (XBCommStream *) comm;
	unsigned fd;
	assert (NULL != stream);
	/* get file descriptor for debug output */
	fd = Socket_Fd (stream->comm.socket);
	/* send top element of send queue */
	result = Net_Send (stream->sndQueue, stream->comm.socket);
	switch (result) {
	case XBT_R_Complete:		/* queue has been emptied */
		Dbg_Stream ("sent all telegrams on fd=%u\n", fd);
		/* no more writing needed */
		Socket_UnregisterWrite (CommSocket (&stream->comm));
		/* state change to parent layer */
		assert (stream->eventFunc != NULL);
		(void)(*stream->eventFunc) (stream, XBST_WAIT);
		/* shutdown for empty queue, if asked for */
		if (stream->prepFinish) {
			Socket_ShutdownWrite (CommSocket (&stream->comm));
			Dbg_Stream ("socket shutdown for writing\n");
		}
		return XCR_OK;
	case XBT_R_IOError:		/* error while sending telegram */
		Dbg_Stream ("i/o-error write to fd=%u, shutting down\n", fd);
		/* state change to parent layer */
		assert (stream->eventFunc != NULL);
		(void)(*stream->eventFunc) (stream, XBST_IOWRITE);
		/* return error, deletes XBComm */
		return XCR_Error;
	default:					/* anything else */
		Dbg_Stream ("partial send on fd=%u\n", fd);
		/* state to parent layer */
		assert (stream->eventFunc != NULL);
		(void)(*stream->eventFunc) (stream, XBST_BUSY);
		return XCR_OK;
	}
}								/* WriteStream */

/*
 * add a XBCommStream
 */
void
Stream_CommInit (XBCommStream * stream, XBCommType commType, XBSocket * pSocket,
				 StreamHandleFunc handleFunc, StreamEventFunc eventFunc, XBCommFunc deleteFunc)
{
	assert (stream != NULL);
	/* add the underlying XBComm to internal list */
	CommInit (&stream->comm, commType, pSocket, ReadStream, WriteStream, deleteFunc);
	/* set stream specific handlers */
	stream->handleFunc = handleFunc;
	stream->eventFunc = eventFunc;
	/* flag: shutdown when send queue empty */
	stream->prepFinish = XBFalse;
	/* create queues */
	stream->sndQueue = Net_CreateSndQueue (commType == COMM_ToClient);
	stream->rcvQueue = Net_CreateRcvQueue (commType == COMM_ToClient);
	assert (NULL != stream->sndQueue);
	assert (NULL != stream->rcvQueue);
}								/* Stream_CommInit */

/*
 * remove a XBCommStream
 * does not free allocated XBComm memory!!
 */
void
Stream_CommFinish (XBCommStream * stream)
{
	/* debug output before socket is freed */
	Dbg_Stream ("removing stream on fd=%u\n", Socket_Fd (stream->comm.socket));
	/* remove XBComm from internal list, socket is freed */
	CommFinish (&stream->comm);
	/* free up queues */
	Net_DeleteSndQueue (stream->sndQueue);
	Net_DeleteRcvQueue (stream->rcvQueue);
	/* close event to parent layer */
	assert (stream->eventFunc != NULL);
	(void)(*stream->eventFunc) (stream, XBST_CLOSE);
}								/* Stream_CommFinish */

/*
 * end of file com_stream.c
 */