1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
|
#if defined(__sun__) && defined(__svr4__)
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <port.h>
#include <sys/port_impl.h>
#endif
#include <unistd.h>
#include "mplexer.hh"
#include "sstuff.hh"
#include <iostream>
#include "misc.hh"
#include "namespaces.hh"
class PortsFDMultiplexer : public FDMultiplexer
{
public:
PortsFDMultiplexer();
virtual ~PortsFDMultiplexer()
{
close(d_portfd);
}
virtual int run(struct timeval* tv, int timeout=500);
virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter);
virtual void removeFD(callbackmap_t& cbmap, int fd);
string getName()
{
return "solaris completion ports";
}
private:
int d_portfd;
boost::shared_array<port_event_t> d_pevents;
static int s_maxevents; // not a hard maximum
};
static FDMultiplexer* makePorts()
{
return new PortsFDMultiplexer();
}
static struct PortsRegisterOurselves
{
PortsRegisterOurselves() {
FDMultiplexer::getMultiplexerMap().insert(make_pair(0, &makePorts)); // priority 0!
}
} doItPorts;
int PortsFDMultiplexer::s_maxevents=1024;
PortsFDMultiplexer::PortsFDMultiplexer() : d_pevents(new port_event_t[s_maxevents])
{
d_portfd=port_create(); // not hard max
if(d_portfd < 0)
throw FDMultiplexerException("Setting up port: "+stringerror());
}
void PortsFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter)
{
accountingAddFD(cbmap, fd, toDo, parameter);
if(port_associate(d_portfd, PORT_SOURCE_FD, fd, (&cbmap == &d_readCallbacks) ? POLLIN : POLLOUT, 0) < 0) {
cbmap.erase(fd);
throw FDMultiplexerException("Adding fd to port set: "+stringerror());
}
}
void PortsFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
{
if(!cbmap.erase(fd))
throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer");
if(port_dissociate(d_portfd, PORT_SOURCE_FD, fd) < 0 && errno != ENOENT) // it appears under some circumstances, ENOENT will be returned, without this being an error. Apache has this same "fix"
throw FDMultiplexerException("Removing fd from port set: "+stringerror());
}
int PortsFDMultiplexer::run(struct timeval* now, int timeout)
{
if(d_inrun) {
throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n");
}
struct timespec timeoutspec;
timeoutspec.tv_sec = time / 1000;
timeoutspec.tv_nsec = (time % 1000) * 1000000;
unsigned int numevents=1;
int ret= port_getn(d_portfd, d_pevents.get(), min(PORT_MAX_LIST, s_maxevents), &numevents, &timeoutspec);
/* port_getn has an unusual API - (ret == -1, errno == ETIME) can
mean partial success; you must check (*numevents) in this case
and process anything in there, otherwise you'll never see any
events from that object again. We don't care about pure timeouts
(ret == -1, errno == ETIME, *numevents == 0) so we don't bother
with that case. */
if(ret == -1 && errno!=ETIME) {
if(errno!=EINTR)
throw FDMultiplexerException("completion port_getn returned error: "+stringerror());
// EINTR is not really an error
gettimeofday(now,0);
return 0;
}
gettimeofday(now,0);
if(!numevents) // nothing
return 0;
d_inrun=true;
for(unsigned int n=0; n < numevents; ++n) {
d_iter=d_readCallbacks.find(d_pevents[n].portev_object);
if(d_iter != d_readCallbacks.end()) {
d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
if(d_readCallbacks.count(d_pevents[n].portev_object) && port_associate(d_portfd, PORT_SOURCE_FD, d_pevents[n].portev_object,
POLLIN, 0) < 0)
throw FDMultiplexerException("Unable to add fd back to ports (read): "+stringerror());
continue; // so we don't find ourselves as writable again
}
d_iter=d_writeCallbacks.find(d_pevents[n].portev_object);
if(d_iter != d_writeCallbacks.end()) {
d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
if(d_writeCallbacks.count(d_pevents[n].portev_object) && port_associate(d_portfd, PORT_SOURCE_FD, d_pevents[n].portev_object,
POLLOUT, 0) < 0)
throw FDMultiplexerException("Unable to add fd back to ports (write): "+stringerror());
}
}
d_inrun=false;
return numevents;
}
#if 0
void acceptData(int fd, boost::any& parameter)
{
cout<<"Have data on fd "<<fd<<endl;
Socket* sock=boost::any_cast<Socket*>(parameter);
string packet;
IPEndpoint rem;
sock->recvFrom(packet, rem);
cout<<"Received "<<packet.size()<<" bytes!\n";
}
int main()
{
Socket s(AF_INET, SOCK_DGRAM);
IPEndpoint loc("0.0.0.0", 2000);
s.bind(loc);
PortsFDMultiplexer sfm;
sfm.addReadFD(s.getHandle(), &acceptData, &s);
for(int n=0; n < 100 ; ++n) {
sfm.run();
}
sfm.removeReadFD(s.getHandle());
sfm.removeReadFD(s.getHandle());
}
#endif
|