File: resolver.cpp

package info (click to toggle)
sigx 2.0.2-3
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 1,144 kB
  • ctags: 1,311
  • sloc: cpp: 3,103; ansic: 653; xml: 206; python: 65; makefile: 26
file content (187 lines) | stat: -rw-r--r-- 5,351 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
#include <cstddef>
#include <string>
#include <iostream>
#include <algorithm>
#include <sigxconfig.h>
#ifdef SIGX_MSC
// windows and msvc++
#  if (_WIN32_WINNT >= 0x0501)
#  include <winsock2.h>
#  else
// must include for versions earlier than win xp
#  include <Wspiapi.h>
#  endif
#else
#  include <sys/socket.h> // AF_INET
#  include <arpa/inet.h>
#  include <netdb.h> // hostent, gethostbyaddr, ..
#endif
#include <sigx/tunnel_functor.h>
#include "resolver.h"
#include "resolver_p.h"

using namespace std;

#define DEBUG 1


IPResolverThread::ThreadData::ThreadData(): 
	m_msgQueue(), 
	m_connIdle(), 
	m_nIPPrev(), 
	m_strHost(), 
	m_bStopResolving(), 
	m_sigResolvingStopped(), 
	m_sigFinished(), 
	m_sigResolved()
{}

IPResolverThread::IPResolverThread(): 
	sigx::glib_threadable(), 
	m_ThreadData(), 
	// request api
	resolve(sigc::mem_fun(this, &IPResolverThread::on_marshall_resolving)), 
	stop_resolving(sigc::mem_fun(this, &IPResolverThread::on_stop_resolving)), 
	// signal api, signals live in threadprivate data
	signal_resolving_stopped(*this, m_ThreadData, &ThreadData::m_sigResolvingStopped), 
	signal_finished(*this, m_ThreadData, &ThreadData::m_sigFinished), 
	signal_resolved(*this, m_ThreadData, &ThreadData::m_sigResolved)
{}

//virtual 
void IPResolverThread::on_startup()
{
	// thread private pdata will be freed when the thread ends (according to the glib docs)
	m_ThreadData.set(new ThreadData);
}

//virtual 
void IPResolverThread::on_cleanup()
{
	ThreadData* pthreaddata = m_ThreadData.get();
	if (pthreaddata->m_connIdle.connected())
		pthreaddata->m_connIdle.disconnect();

	// tell others that I'm about to finish, even they might have joined me
	pthreaddata->m_sigFinished.emit();
}

void IPResolverThread::on_stop_resolving()
{
	ThreadData* pthreaddata = m_ThreadData.get();
	pthreaddata->m_bStopResolving = true;
	pthreaddata->m_sigResolvingStopped.emit();
}

void IPResolverThread::on_marshall_resolving(in_addr_t nIP)
{
	ThreadData* pthreaddata = m_ThreadData.get();
	
	if (pthreaddata->m_bStopResolving)
		return;

	const list<in_addr_t>::const_iterator it = 
		find(pthreaddata->m_msgQueue.begin(), pthreaddata->m_msgQueue.end(), nIP);
	if (it == pthreaddata->m_msgQueue.end())
	{	// if ip is not yet in the message queue, append it
		pthreaddata->m_msgQueue.push_back(nIP);
	}

	// wait until we've got all messages;
	// if there are still messages in the dispatcher queue,
	// defer resolving.
	// the main purpose is to optimize this thread: 
	// 1) resolving could take a longer time and there could be already
	// the "finish" message in the dispatcher queue. if we just blindly resolve
	// the next ip then we've got a problem.
	// 2) it could be that there are the same ips to resolve in the dispatcher
	// queue; we can skip them because everyone connected to the signal_resolved
	// gets the resolved ip and so we can speed up resolving
	// 3) if we would process messages all the time then this thread could end up in 
	// a denial of service, so watch out for finish
	// 
	// process other tunneled messages waiting in the dispatcher queue 
	// (the IPResolverThread is a dispatchable and thus has a dispatcher)
	if (dispatcher()->queued_contexts() > 0)
		return;
	
	// upon receiving the next idle signal resolve the next IP
	if (!pthreaddata->m_connIdle.connected())
	{
		pthreaddata->m_connIdle = maincontext()->signal_idle().connect(
			sigc::mem_fun(this, &IPResolverThread::resolve_next));
	}
}

bool IPResolverThread::resolve_next()
{
	ThreadData* pthreaddata = m_ThreadData.get();
	const in_addr_t nIP = pthreaddata->m_msgQueue.front();
	pthreaddata->m_msgQueue.pop_front();

	in_addr addr = {};
	addr.s_addr = nIP;
	int nErr(0);
	if ((pthreaddata->m_nIPPrev != nIP) || (pthreaddata->m_strHost.empty()))
	{	// if the current ip differs from the previous one
		// or ip couldn't be resolved before
		pthreaddata->m_nIPPrev = nIP;
		if (DEBUG)
		{
			cout << "Resolving: " << inet_ntoa(addr) << endl;
		}
		nErr = resolve_this(addr);
	}
	else if (DEBUG)
	{
		cout << "Resolved: " << inet_ntoa(addr) << " to " << pthreaddata->m_strHost << endl;
	}

	pthreaddata->m_sigResolved.emit(pthreaddata->m_strHost, nIP, nErr);
	
	// the IPResolverThread is a dispatchable and thus has a dispatcher reference
	//if ((m_disp_ptr->invoke()->queued_contexts() > 0)	|| 
	if ((dispatcher()->queued_contexts() > 0)	|| 
		(pthreaddata->m_msgQueue.empty()	)	)
	{
		// disconnect from idle signal if there are messages waiting
		// in the dispatcher queue. there could be a "finish"-signal
		// waiting;
		// also disconnect if there are no addresses to resolve anymore;
	 	// this method will be triggered again by the next ip to resolve;
		return false;
	}

	// otherwise stay connected;
	return true;
}

int IPResolverThread::resolve_this(in_addr addr)
{
	ThreadData* pthreaddata = m_ThreadData.get();
	int nErr(0);

#ifdef SIGC_MSC
	const hostent* phe = gethostbyaddr(inet_ntoa(addr), sizeof(in_addr), AF_INET);
#else
	const hostent* phe = gethostbyaddr(&addr, sizeof(in_addr), AF_INET);
#endif
	if (phe)
	{	// resolved?
		if (DEBUG)
		{
			cout << "Resolved: " << inet_ntoa(addr) << " to " << phe->h_name << endl;
		}
		pthreaddata->m_strHost = phe->h_name;
	}
	else
	{
		nErr = h_errno;
		if (DEBUG)
			cerr << "not resolvable, error " << nErr << endl;

		pthreaddata->m_strHost.clear();
	}

	return nErr;
}