File: BufferedIO.h

package info (click to toggle)
ruby-passenger 4.0.53-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 28,668 kB
  • ctags: 70,512
  • sloc: cpp: 264,280; ruby: 25,606; sh: 22,815; ansic: 18,277; python: 767; makefile: 99; perl: 20
file content (209 lines) | stat: -rw-r--r-- 6,671 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
#ifndef _PASSENGER_BUFFERED_IO_H_
#define _PASSENGER_BUFFERED_IO_H_

#include <string>
#include <utility>
#include <algorithm>
#include <boost/function.hpp>
#include <oxt/system_calls.hpp>
#include <oxt/macros.hpp>
#include <cstring>
#include <FileDescriptor.h>
#include <Exceptions.h>
#include <StaticString.h>
#include <Utils/IOUtils.h>

namespace Passenger {

using namespace std;
using namespace oxt;


/**
 * Provides buffered I/O for arbitrary file descriptors. Supports features not
 * found in C++'s iostream or libc's stdio:
 * - All functions have timeout support.
 * - The readLine() method returns a C++ string, so no need to worry about
 *   buffer management. A size limit can be imposed.
 * - Read buffer is infinite in size.
 * - Unreading (pushing back) arbitrary amount of data.
 */
class BufferedIO {
private:
	FileDescriptor fd;
	string buffer;

	static pair<unsigned int, bool> nReadOrEofReached(const char *data,
		unsigned int size, void *output, unsigned int goalSize, unsigned int *alreadyRead)
	{
		char *current = (char *) output + *alreadyRead;
		unsigned int remaining = goalSize - *alreadyRead;
		unsigned int consumed = min(remaining, size);
		memcpy(current, data, consumed);
		*alreadyRead += consumed;
		return make_pair(consumed, *alreadyRead == goalSize);
	}

	static pair<unsigned int, bool> eofReached(const char *data,
		unsigned int size, string *output)
	{
		output->append(data, size);
		return make_pair(size, false);
	}

	static pair<unsigned int, bool> newlineFound(const char *data,
		unsigned int size, string *output, unsigned int max)
	{
		const char *newline = (const char *) memchr(data, '\n', size);
		if (newline != NULL) {
			unsigned int accepted = newline - data + 1;
			if (output->size() + accepted > max) {
				throw SecurityException("Line too long");
			}
			output->append(data, accepted);
			return make_pair(accepted, true);
		} else {
			if (output->size() + size > max) {
				throw SecurityException("Line too long");
			}
			output->append(data, size);
			return make_pair(size, false);
		}
	}

public:
	typedef boost::function< pair<unsigned int, bool>(const char *data, unsigned int size) > AcceptFunction;

	BufferedIO() { }

	BufferedIO(const FileDescriptor &_fd)
		: fd(_fd)
		{ }

	FileDescriptor getFd() const {
		return fd;
	}

	const string &getBuffer() const {
		return buffer;
	}

	/**
	 * This method keeps reading data in a loop, feeding each chunk to the given
	 * acceptor function, until the function says that it has consumed all data
	 * that it needs. Leftover data that has been read from the file descriptor
	 * but not consumed by the acceptor function will be put in the buffer, making
	 * it available for future read operations.
	 *
	 * The acceptor function accepts (data, size) as arguments and returns a
	 * (consumed, done) pair, where 'consumed' indicates the number of bytes
	 * from 'data' that it has consumed. 'done' indicates whether the acceptor
	 * function is done consuming (true), or whether it expects more data (false).
	 *
	 * readUntil() can be used for e.g. reading data until a newline is encountered.
	 *
	 * If the acceptor function throws an exception then the BufferedIO instance
	 * will be left in an undefined state, making it unusable.
	 *
	 * @throws RuntimeException The acceptor function returned an invalid result.
	 * @throws SystemException
	 * @throws TimeoutException
	 * @throws boost::thread_interrupted
	 */
	unsigned int readUntil(const AcceptFunction &acceptor, unsigned long long *timeout = NULL) {
		pair<unsigned int, bool> acceptResult;
		unsigned int totalRead = 0;

		if (!buffer.empty()) {
			acceptResult = acceptor(buffer.c_str(), buffer.size());
			if (OXT_UNLIKELY(!acceptResult.second && acceptResult.first < buffer.size())) {
				throw RuntimeException("Acceptor function cannot return (x,false) where x is smaller than the input size");
			} else if (OXT_UNLIKELY(acceptResult.first > buffer.size())) {
				throw RuntimeException("Acceptor function cannot return a larger accept count than the input size");
			}
			buffer.erase(0, acceptResult.first);
			totalRead = acceptResult.first;
			if (acceptResult.second) {
				return totalRead;
			}
		}

		while (true) {
			if (OXT_UNLIKELY(timeout != NULL && !waitUntilReadable(fd, timeout))) {
				throw TimeoutException("Read timeout");
			}

			char tmp[1024 * 8];
			ssize_t ret = syscalls::read(fd, tmp, sizeof(tmp));
			if (ret == 0) {
				return totalRead;
			} else if (OXT_UNLIKELY(ret == -1)) {
				if (errno != EAGAIN) {
					int e = errno;
					throw SystemException("read() failed", e);
				}
			} else {
				acceptResult = acceptor(tmp, ret);
				totalRead += acceptResult.first;
				if (OXT_UNLIKELY(!acceptResult.second && acceptResult.first < (unsigned int) ret)) {
					throw RuntimeException("Acceptor function cannot return (x,false) where x is smaller than the input size");
				} else if (OXT_UNLIKELY(acceptResult.first > (unsigned int) ret)) {
					throw RuntimeException("Acceptor function cannot return a larger accept count than the input size");
				}
				if (acceptResult.second) {
					buffer.assign(tmp + acceptResult.first, ret - acceptResult.first);
					return totalRead;
				}
			}
		}
	}

	unsigned int read(void *buf, unsigned int size, unsigned long long *timeout = NULL) {
		unsigned int counter = 0;
		return readUntil(
			boost::bind(nReadOrEofReached, _1, _2, buf, size, &counter),
			timeout);
	}

	string readAll(unsigned long long *timeout = NULL) {
		string output;
		readUntil(boost::bind(eofReached, _1, _2, &output), timeout);
		return output;
	}

	/**
	 * Reads a line and returns the line including the newline character. Upon
	 * encountering EOF, the empty string is returned.
	 *
	 * The `max` parameter dictates the maximum length of the returned line.
	 * If the line is longer than this number of characters, then a SecurityException
	 * is thrown, and the BufferedIO becomes unusable (enters an undefined state).
	 *
	 * @throws SystemException
	 * @throws TimeoutException
	 * @throws SecurityException
	 * @throws boost::thread_interrupted
	 */
	string readLine(unsigned int max = 1024, unsigned long long *timeout = NULL) {
		string output;
		readUntil(boost::bind(newlineFound, _1, _2, &output, max), timeout);
		return output;
	}

	void unread(const void *buf, unsigned int size) {
		string newBuffer;
		newBuffer.reserve(size + buffer.size());
		newBuffer.append((const char *) buf, (string::size_type) size);
		newBuffer.append(buffer);
		buffer = newBuffer;
	}

	void unread(const StaticString &str) {
		unread(str.c_str(), str.size());
	}
};


} // namespace Passenger

#endif /* _PASSENGER_BUFFERED_IO_H_ */