1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
|
#pragma once
#include "mplexer.hh"
#include "tcpiohandler.hh"
#if 0
#define DEBUGLOG_ENABLED
#define DEBUGLOG(x) cerr<<x<<endl;
#else
#define DEBUGLOG(x)
#endif
class IOStateHandler
{
public:
IOStateHandler(FDMultiplexer& mplexer, const int fd): d_mplexer(mplexer), d_fd(fd)
{
}
IOStateHandler(FDMultiplexer& mplexer): d_mplexer(mplexer), d_fd(-1)
{
}
~IOStateHandler()
{
/* be careful that this won't save us if the callback is still registered to the multiplexer,
because in that case the shared pointer count will never reach zero so this destructor won't
be called */
try {
reset();
}
catch (const FDMultiplexerException& e) {
/* that should not happen, but an exception raised from a destructor would be bad so better
safe than sorry */
}
}
bool isWaitingForRead() const
{
return d_isWaitingForRead;
}
bool isWaitingForWrite() const
{
return d_isWaitingForWrite;
}
void setSocket(int fd)
{
if (d_fd != -1) {
throw std::runtime_error("Trying to set the socket descriptor on an already initialized IOStateHandler");
}
d_fd = fd;
}
void reset()
{
update(IOState::Done);
}
std::string getState() const
{
std::string result("--");
result.reserve(2);
if (isWaitingForRead()) {
result.at(0) = 'R';
}
if (isWaitingForWrite()) {
result.at(1) = 'W';
}
return result;
}
void add(IOState iostate, FDMultiplexer::callbackfunc_t callback, FDMultiplexer::funcparam_t callbackData, boost::optional<struct timeval> ttd)
{
DEBUGLOG("in "<<__PRETTY_FUNCTION__<<" for fd "<<d_fd<<", last state was "<<getState()<<", adding "<<(int)iostate);
if (iostate == IOState::NeedRead) {
if (isWaitingForRead()) {
if (ttd) {
/* let's update the TTD ! */
d_mplexer.setReadTTD(d_fd, *ttd, /* we pass 0 here because we already have a TTD */0);
}
return;
}
d_mplexer.addReadFD(d_fd, callback, callbackData, ttd ? &*ttd : nullptr);
DEBUGLOG(__PRETTY_FUNCTION__<<": add read FD "<<d_fd);
d_isWaitingForRead = true;
}
else if (iostate == IOState::NeedWrite) {
if (isWaitingForWrite()) {
if (ttd) {
/* let's update the TTD ! */
d_mplexer.setWriteTTD(d_fd, *ttd, /* we pass 0 here because we already have a TTD */0);
}
return;
}
d_mplexer.addWriteFD(d_fd, callback, callbackData, ttd ? &*ttd : nullptr);
DEBUGLOG(__PRETTY_FUNCTION__<<": add write FD "<<d_fd);
d_isWaitingForWrite = true;
}
}
void update(IOState iostate, FDMultiplexer::callbackfunc_t callback = FDMultiplexer::callbackfunc_t(), FDMultiplexer::funcparam_t callbackData = boost::any(), boost::optional<struct timeval> ttd = boost::none)
{
DEBUGLOG("in "<<__PRETTY_FUNCTION__<<" for fd "<<d_fd<<", last state was "<<getState()<<" , new state is "<<(int)iostate);
if (isWaitingForRead() && iostate == IOState::Done) {
DEBUGLOG(__PRETTY_FUNCTION__<<": remove read FD "<<d_fd);
d_mplexer.removeReadFD(d_fd);
d_isWaitingForRead = false;
}
if (isWaitingForWrite() && iostate == IOState::Done) {
DEBUGLOG(__PRETTY_FUNCTION__<<": remove write FD "<<d_fd);
d_mplexer.removeWriteFD(d_fd);
d_isWaitingForWrite = false;
}
if (iostate == IOState::NeedRead) {
if (isWaitingForRead()) {
if (ttd) {
/* let's update the TTD ! */
d_mplexer.setReadTTD(d_fd, *ttd, /* we pass 0 here because we already have a TTD */0);
}
return;
}
if (isWaitingForWrite()) {
d_isWaitingForWrite = false;
d_mplexer.alterFDToRead(d_fd, callback, callbackData, ttd ? &*ttd : nullptr);
DEBUGLOG(__PRETTY_FUNCTION__<<": alter from write to read FD "<<d_fd);
}
else {
d_mplexer.addReadFD(d_fd, callback, callbackData, ttd ? &*ttd : nullptr);
DEBUGLOG(__PRETTY_FUNCTION__<<": add read FD "<<d_fd);
}
d_isWaitingForRead = true;
}
else if (iostate == IOState::NeedWrite) {
if (isWaitingForWrite()) {
if (ttd) {
/* let's update the TTD ! */
d_mplexer.setWriteTTD(d_fd, *ttd, /* we pass 0 here because we already have a TTD */0);
}
return;
}
if (isWaitingForRead()) {
d_isWaitingForRead = false;
d_mplexer.alterFDToWrite(d_fd, callback, callbackData, ttd ? &*ttd : nullptr);
DEBUGLOG(__PRETTY_FUNCTION__<<": alter from read to write FD "<<d_fd);
}
else {
d_mplexer.addWriteFD(d_fd, callback, callbackData, ttd ? &*ttd : nullptr);
DEBUGLOG(__PRETTY_FUNCTION__<<": add write FD "<<d_fd);
}
d_isWaitingForWrite = true;
}
else if (iostate == IOState::Done) {
DEBUGLOG(__PRETTY_FUNCTION__<<": done");
}
}
private:
FDMultiplexer& d_mplexer;
int d_fd;
bool d_isWaitingForRead{false};
bool d_isWaitingForWrite{false};
};
class IOStateGuard
{
public:
/* this class is using RAII to make sure we don't forget to release an IOStateHandler
from the IO multiplexer in case of exception / error handling */
IOStateGuard(std::unique_ptr<IOStateHandler>& handler): d_handler(handler), d_enabled(true)
{
}
~IOStateGuard()
{
/* if we are still owning the state when we go out of scope,
let's reset the state so it's not registered to the IO multiplexer anymore
and its reference count goes to zero */
if (d_enabled && d_handler) {
DEBUGLOG("IOStateGuard destroyed while holding a state, let's reset it");
try {
d_handler->reset();
}
catch (const FDMultiplexerException& e) {
/* that should not happen, but an exception raised from a destructor would be bad so better
safe than sorry */
}
d_enabled = false;
}
}
void release()
{
d_enabled = false;
}
private:
std::unique_ptr<IOStateHandler>& d_handler;
bool d_enabled;
};
|