1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
|
#ifndef _PASSENGER_BUFFERED_IO_H_
#define _PASSENGER_BUFFERED_IO_H_
#include <string>
#include <utility>
#include <algorithm>
#include <boost/function.hpp>
#include <oxt/system_calls.hpp>
#include <oxt/macros.hpp>
#include <cstring>
#include <FileDescriptor.h>
#include <Exceptions.h>
#include <StaticString.h>
#include <Utils/IOUtils.h>
namespace Passenger {
using namespace std;
using namespace oxt;
/**
* Provides buffered I/O for arbitrary file descriptors. Supports features not
* found in C++'s iostream or libc's stdio:
* - All functions have timeout support.
* - The readLine() method returns a C++ string, so no need to worry about
* buffer management. A size limit can be imposed.
* - Read buffer is infinite in size.
* - Unreading (pushing back) arbitrary amount of data.
*/
class BufferedIO {
private:
FileDescriptor fd;
string buffer;
static pair<unsigned int, bool> nReadOrEofReached(const char *data,
unsigned int size, void *output, unsigned int goalSize, unsigned int *alreadyRead)
{
char *current = (char *) output + *alreadyRead;
unsigned int remaining = goalSize - *alreadyRead;
unsigned int consumed = min(remaining, size);
memcpy(current, data, consumed);
*alreadyRead += consumed;
return make_pair(consumed, *alreadyRead == goalSize);
}
static pair<unsigned int, bool> eofReached(const char *data,
unsigned int size, string *output)
{
output->append(data, size);
return make_pair(size, false);
}
static pair<unsigned int, bool> newlineFound(const char *data,
unsigned int size, string *output, unsigned int max)
{
const char *newline = (const char *) memchr(data, '\n', size);
if (newline != NULL) {
unsigned int accepted = newline - data + 1;
if (output->size() + accepted > max) {
throw SecurityException("Line too long");
}
output->append(data, accepted);
return make_pair(accepted, true);
} else {
if (output->size() + size > max) {
throw SecurityException("Line too long");
}
output->append(data, size);
return make_pair(size, false);
}
}
public:
typedef boost::function< pair<unsigned int, bool>(const char *data, unsigned int size) > AcceptFunction;
BufferedIO() { }
BufferedIO(const FileDescriptor &_fd)
: fd(_fd)
{ }
FileDescriptor getFd() const {
return fd;
}
const string &getBuffer() const {
return buffer;
}
/**
* This method keeps reading data in a loop, feeding each chunk to the given
* acceptor function, until the function says that it has consumed all data
* that it needs. Leftover data that has been read from the file descriptor
* but not consumed by the acceptor function will be put in the buffer, making
* it available for future read operations.
*
* The acceptor function accepts (data, size) as arguments and returns a
* (consumed, done) pair, where 'consumed' indicates the number of bytes
* from 'data' that it has consumed. 'done' indicates whether the acceptor
* function is done consuming (true), or whether it expects more data (false).
*
* readUntil() can be used for e.g. reading data until a newline is encountered.
*
* If the acceptor function throws an exception then the BufferedIO instance
* will be left in an undefined state, making it unusable.
*
* @throws RuntimeException The acceptor function returned an invalid result.
* @throws SystemException
* @throws TimeoutException
* @throws boost::thread_interrupted
*/
unsigned int readUntil(const AcceptFunction &acceptor, unsigned long long *timeout = NULL) {
pair<unsigned int, bool> acceptResult;
unsigned int totalRead = 0;
if (!buffer.empty()) {
acceptResult = acceptor(buffer.c_str(), buffer.size());
if (OXT_UNLIKELY(!acceptResult.second && acceptResult.first < buffer.size())) {
throw RuntimeException("Acceptor function cannot return (x,false) where x is smaller than the input size");
} else if (OXT_UNLIKELY(acceptResult.first > buffer.size())) {
throw RuntimeException("Acceptor function cannot return a larger accept count than the input size");
}
buffer.erase(0, acceptResult.first);
totalRead = acceptResult.first;
if (acceptResult.second) {
return totalRead;
}
}
while (true) {
if (OXT_UNLIKELY(timeout != NULL && !waitUntilReadable(fd, timeout))) {
throw TimeoutException("Read timeout");
}
char tmp[1024 * 8];
ssize_t ret = syscalls::read(fd, tmp, sizeof(tmp));
if (ret == 0) {
return totalRead;
} else if (OXT_UNLIKELY(ret == -1)) {
if (errno != EAGAIN) {
int e = errno;
throw SystemException("read() failed", e);
}
} else {
acceptResult = acceptor(tmp, ret);
totalRead += acceptResult.first;
if (OXT_UNLIKELY(!acceptResult.second && acceptResult.first < (unsigned int) ret)) {
throw RuntimeException("Acceptor function cannot return (x,false) where x is smaller than the input size");
} else if (OXT_UNLIKELY(acceptResult.first > (unsigned int) ret)) {
throw RuntimeException("Acceptor function cannot return a larger accept count than the input size");
}
if (acceptResult.second) {
buffer.assign(tmp + acceptResult.first, ret - acceptResult.first);
return totalRead;
}
}
}
}
unsigned int read(void *buf, unsigned int size, unsigned long long *timeout = NULL) {
unsigned int counter = 0;
return readUntil(
boost::bind(nReadOrEofReached, _1, _2, buf, size, &counter),
timeout);
}
string readAll(unsigned long long *timeout = NULL) {
string output;
readUntil(boost::bind(eofReached, _1, _2, &output), timeout);
return output;
}
/**
* Reads a line and returns the line including the newline character. Upon
* encountering EOF, the empty string is returned.
*
* The `max` parameter dictates the maximum length of the returned line.
* If the line is longer than this number of characters, then a SecurityException
* is thrown, and the BufferedIO becomes unusable (enters an undefined state).
*
* @throws SystemException
* @throws TimeoutException
* @throws SecurityException
* @throws boost::thread_interrupted
*/
string readLine(unsigned int max = 1024, unsigned long long *timeout = NULL) {
string output;
readUntil(boost::bind(newlineFound, _1, _2, &output, max), timeout);
return output;
}
void unread(const void *buf, unsigned int size) {
string newBuffer;
newBuffer.reserve(size + buffer.size());
newBuffer.append((const char *) buf, (string::size_type) size);
newBuffer.append(buffer);
buffer = newBuffer;
}
void unread(const StaticString &str) {
unread(str.c_str(), str.size());
}
};
} // namespace Passenger
#endif /* _PASSENGER_BUFFERED_IO_H_ */
|