1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
|
#include <cstddef>
#include <string>
#include <iostream>
#include <algorithm>
#include <sigxconfig.h>
#ifdef SIGX_MSC
// windows and msvc++
# if (_WIN32_WINNT >= 0x0501)
# include <winsock2.h>
# else
// must include for versions earlier than win xp
# include <Wspiapi.h>
# endif
#else
# include <sys/socket.h> // AF_INET
# include <arpa/inet.h>
# include <netdb.h> // hostent, gethostbyaddr, ..
#endif
#include <sigx/tunnel_functor.h>
#include "resolver.h"
#include "resolver_p.h"
using namespace std;
#define DEBUG 1
IPResolverThread::ThreadData::ThreadData():
m_msgQueue(),
m_connIdle(),
m_nIPPrev(),
m_strHost(),
m_bStopResolving(),
m_sigResolvingStopped(),
m_sigFinished(),
m_sigResolved()
{}
IPResolverThread::IPResolverThread():
sigx::glib_threadable(),
m_ThreadData(),
// request api
resolve(sigc::mem_fun(this, &IPResolverThread::on_marshall_resolving)),
stop_resolving(sigc::mem_fun(this, &IPResolverThread::on_stop_resolving)),
// signal api, signals live in threadprivate data
signal_resolving_stopped(*this, m_ThreadData, &ThreadData::m_sigResolvingStopped),
signal_finished(*this, m_ThreadData, &ThreadData::m_sigFinished),
signal_resolved(*this, m_ThreadData, &ThreadData::m_sigResolved)
{}
//virtual
void IPResolverThread::on_startup()
{
// thread private pdata will be freed when the thread ends (according to the glib docs)
m_ThreadData.set(new ThreadData);
}
//virtual
void IPResolverThread::on_cleanup()
{
ThreadData* pthreaddata = m_ThreadData.get();
if (pthreaddata->m_connIdle.connected())
pthreaddata->m_connIdle.disconnect();
// tell others that I'm about to finish, even they might have joined me
pthreaddata->m_sigFinished.emit();
}
void IPResolverThread::on_stop_resolving()
{
ThreadData* pthreaddata = m_ThreadData.get();
pthreaddata->m_bStopResolving = true;
pthreaddata->m_sigResolvingStopped.emit();
}
void IPResolverThread::on_marshall_resolving(in_addr_t nIP)
{
ThreadData* pthreaddata = m_ThreadData.get();
if (pthreaddata->m_bStopResolving)
return;
const list<in_addr_t>::const_iterator it =
find(pthreaddata->m_msgQueue.begin(), pthreaddata->m_msgQueue.end(), nIP);
if (it == pthreaddata->m_msgQueue.end())
{ // if ip is not yet in the message queue, append it
pthreaddata->m_msgQueue.push_back(nIP);
}
// wait until we've got all messages;
// if there are still messages in the dispatcher queue,
// defer resolving.
// the main purpose is to optimize this thread:
// 1) resolving could take a longer time and there could be already
// the "finish" message in the dispatcher queue. if we just blindly resolve
// the next ip then we've got a problem.
// 2) it could be that there are the same ips to resolve in the dispatcher
// queue; we can skip them because everyone connected to the signal_resolved
// gets the resolved ip and so we can speed up resolving
// 3) if we would process messages all the time then this thread could end up in
// a denial of service, so watch out for finish
//
// process other tunneled messages waiting in the dispatcher queue
// (the IPResolverThread is a dispatchable and thus has a dispatcher)
if (dispatcher()->queued_contexts() > 0)
return;
// upon receiving the next idle signal resolve the next IP
if (!pthreaddata->m_connIdle.connected())
{
pthreaddata->m_connIdle = maincontext()->signal_idle().connect(
sigc::mem_fun(this, &IPResolverThread::resolve_next));
}
}
bool IPResolverThread::resolve_next()
{
ThreadData* pthreaddata = m_ThreadData.get();
const in_addr_t nIP = pthreaddata->m_msgQueue.front();
pthreaddata->m_msgQueue.pop_front();
in_addr addr = {};
addr.s_addr = nIP;
int nErr(0);
if ((pthreaddata->m_nIPPrev != nIP) || (pthreaddata->m_strHost.empty()))
{ // if the current ip differs from the previous one
// or ip couldn't be resolved before
pthreaddata->m_nIPPrev = nIP;
if (DEBUG)
{
cout << "Resolving: " << inet_ntoa(addr) << endl;
}
nErr = resolve_this(addr);
}
else if (DEBUG)
{
cout << "Resolved: " << inet_ntoa(addr) << " to " << pthreaddata->m_strHost << endl;
}
pthreaddata->m_sigResolved.emit(pthreaddata->m_strHost, nIP, nErr);
// the IPResolverThread is a dispatchable and thus has a dispatcher reference
//if ((m_disp_ptr->invoke()->queued_contexts() > 0) ||
if ((dispatcher()->queued_contexts() > 0) ||
(pthreaddata->m_msgQueue.empty() ) )
{
// disconnect from idle signal if there are messages waiting
// in the dispatcher queue. there could be a "finish"-signal
// waiting;
// also disconnect if there are no addresses to resolve anymore;
// this method will be triggered again by the next ip to resolve;
return false;
}
// otherwise stay connected;
return true;
}
int IPResolverThread::resolve_this(in_addr addr)
{
ThreadData* pthreaddata = m_ThreadData.get();
int nErr(0);
#ifdef SIGC_MSC
const hostent* phe = gethostbyaddr(inet_ntoa(addr), sizeof(in_addr), AF_INET);
#else
const hostent* phe = gethostbyaddr(&addr, sizeof(in_addr), AF_INET);
#endif
if (phe)
{ // resolved?
if (DEBUG)
{
cout << "Resolved: " << inet_ntoa(addr) << " to " << phe->h_name << endl;
}
pthreaddata->m_strHost = phe->h_name;
}
else
{
nErr = h_errno;
if (DEBUG)
cerr << "not resolvable, error " << nErr << endl;
pthreaddata->m_strHost.clear();
}
return nErr;
}
|