File: XnSocketInConnection.cpp

package info (click to toggle)
openni2 2.2.0.33%2Bdfsg-11
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 22,216 kB
  • sloc: cpp: 111,197; ansic: 35,511; sh: 10,542; python: 1,313; java: 952; makefile: 575; xml: 12
file content (282 lines) | stat: -rw-r--r-- 8,516 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
#include "XnSocketInConnection.h"
#include "XnLinkProto.h"
#include "XnLinkProtoUtils.h"
#include "XnLinkDefs.h"
#include <XnOS.h>
#include <XnLog.h>

namespace xn
{

//const XnUInt32 SocketInConnection::BUFFER_NUM_PACKETS = 8*12;
const XnUInt32 SocketInConnection::BUFFER_NUM_PACKETS = 1;
const XnUInt32 SocketInConnection::RECEIVE_TIMEOUT = 50;

const XnUInt32 SocketInConnection::CONNECT_TIMEOUT = 10000;
const XnUInt32 SocketInConnection::READ_THREAD_TERMINATE_TIMEOUT = 10000;

//TEMP TEMP TEMP
//const XnUInt32 SocketInConnection::CONNECT_TIMEOUT = XN_WAIT_INFINITE;
//const XnUInt32 SocketInConnection::READ_THREAD_TERMINATE_TIMEOUT = XN_WAIT_INFINITE;
//TEMP TEMP TEMP

SocketInConnection::SocketInConnection()
{
	xnOSMemSet(m_strIP, 0, sizeof(m_strIP));
	m_nPort = 0;
	m_hReadThread = NULL;
	m_hConnectEvent = NULL;
	m_bStopReadThread = FALSE;
	m_pDataDestination = NULL;
	m_nBufferSize = 0;
	m_nMaxPacketSize = 0;
	m_nConnectionStatus = XN_STATUS_OS_NETWORK_CONNECTION_CLOSED;
	m_pBuffer = NULL;
}

SocketInConnection::~SocketInConnection()
{
	Shutdown();
}

XnStatus SocketInConnection::Init(const XnChar* strIP, XnUInt16 nPort, XnUInt16 nMaxPacketSize)
{
	XN_VALIDATE_INPUT_PTR(strIP);
	XnStatus nRetVal = XN_STATUS_OK;
	nRetVal = xnOSStrCopy(m_strIP, strIP, sizeof(m_strIP));
	XN_IS_STATUS_OK_LOG_ERROR("Copy IP", nRetVal);
	m_nPort = nPort;
	m_nMaxPacketSize = nMaxPacketSize;
	m_nBufferSize = m_nMaxPacketSize * BUFFER_NUM_PACKETS;
	m_pBuffer = reinterpret_cast<XnUInt8*>(xnOSMallocAligned(m_nBufferSize, XN_DEFAULT_MEM_ALIGN));
	XN_VALIDATE_ALLOC_PTR(m_pBuffer);
	nRetVal = xnOSCreateEvent(&m_hConnectEvent, FALSE);
	XN_IS_STATUS_OK_LOG_ERROR("Create event", nRetVal);
	xnLogVerbose(XN_MASK_LINK, "Event created for socket %u", m_nPort);
	return XN_STATUS_OK;
}

void SocketInConnection::Shutdown()
{
	xnLogVerbose(XN_MASK_LINK, "Socket in connection %u shutting down", m_nPort);
	Disconnect();
	xnOSFreeAligned(m_pBuffer);
	m_pBuffer = NULL;
	xnOSCloseEvent(&m_hConnectEvent);
}

XnStatus SocketInConnection::Connect()
{
	XnStatus nRetVal = XN_STATUS_OK;
	Disconnect(); // In case we're already connected
	nRetVal = xnOSCreateThread(&ReadThreadProc, this, &m_hReadThread);
	XN_IS_STATUS_OK_LOG_ERROR("Create input socket read thread", nRetVal);
	xnLogVerbose(XN_MASK_LINK, "Waiting for connection on socket %u...", m_nPort);
	nRetVal = xnOSWaitEvent(m_hConnectEvent, CONNECT_TIMEOUT);
	XN_IS_STATUS_OK_LOG_ERROR("Wait for input socket to connect", nRetVal);
	if (m_nConnectionStatus != XN_STATUS_OK)
	{
		xnLogError(XN_MASK_LINK, "Failed to connect to socket %u: %s", m_nPort, xnGetStatusString(m_nConnectionStatus));
		XN_ASSERT(FALSE);
		return m_nConnectionStatus;
	}
	xnLogVerbose(XN_MASK_LINK, "Socket %u connected.", m_nPort);
	nRetVal = xnOSSetThreadPriority(m_hReadThread, XN_PRIORITY_CRITICAL);
	XN_IS_STATUS_OK_LOG_ERROR("Set read thread priority", nRetVal);
	return XN_STATUS_OK;
}

XnBool SocketInConnection::IsConnected() const
{
	return (m_nConnectionStatus == XN_STATUS_OK);
}

void SocketInConnection::Disconnect()
{
	XnStatus nRetVal = XN_STATUS_OK;
	if (m_hReadThread != NULL)
	{
		m_bStopReadThread = TRUE; //Signal read thread to stop running
		nRetVal = xnOSWaitAndTerminateThread(&m_hReadThread, READ_THREAD_TERMINATE_TIMEOUT);
		if (nRetVal != XN_STATUS_OK)
		{
			xnLogWarning("Failed to terminate input socket read thread: %s", xnGetStatusString(nRetVal));
			XN_ASSERT(FALSE);
		}
		m_bStopReadThread = FALSE;
	}
}


XnUInt16 SocketInConnection::GetMaxPacketSize() const
{
	return m_nMaxPacketSize;
}

XnStatus SocketInConnection::SetDataDestination(IDataDestination* pDataDestination)
{
	m_pDataDestination = pDataDestination;
	return XN_STATUS_OK;
}

XN_THREAD_PROC SocketInConnection::ReadThreadProc(XN_THREAD_PARAM pThreadParam)
{
	SocketInConnection* pThis = reinterpret_cast<SocketInConnection*>(pThreadParam);
	if (pThis == NULL)
	{
		xnLogError(XN_MASK_LINK, "Got NULL in socket read thread param :(");
		XN_ASSERT(FALSE);
		return NULL;
	}
	
	pThis->ReadThreadProcImpl();
	
	XN_THREAD_PROC_RETURN(0);
}

XnStatus SocketInConnection::ReadThreadProcImpl()
{
	XnStatus nRetVal = XN_STATUS_OK;
	XN_SOCKET_HANDLE hSocket = NULL;
	XnBool bCanceled = FALSE;
	XnUInt32 nPacketBytesRead = 0;
	XnUInt32 nTotalBytesRead = 0;

	m_nConnectionStatus = ConnectSocket(hSocket, m_strIP, m_nPort);
	XN_IS_STATUS_OK_LOG_ERROR("Connect socket", m_nConnectionStatus);
	nRetVal = xnOSSetEvent(m_hConnectEvent);
	XN_IS_STATUS_OK_LOG_ERROR("Set connect event", nRetVal);

	while (!m_bStopReadThread)
	{
		//Fill buffer with received packets
		nTotalBytesRead = 0;
		for (XnUInt32 nPacket = 0; (nPacket < BUFFER_NUM_PACKETS); nPacket++)
		{
			nPacketBytesRead = m_nMaxPacketSize;
			m_nConnectionStatus = ReceivePacket(hSocket, m_pBuffer + nTotalBytesRead, nPacketBytesRead, bCanceled);
			if (m_nConnectionStatus != XN_STATUS_OK)
			{
				m_pDataDestination->HandleDisconnection();				
				xnLogError(XN_MASK_LINK, "Failed to receive packet: %s", xnGetStatusString(m_nConnectionStatus));
				//XN_ASSERT(FALSE);
				return m_nConnectionStatus;
			}

			if (bCanceled)
			{
				//Ignore packet and exit loop
				break;
			}

			if (nTotalBytesRead == m_nBufferSize)
			{
				xnLogError(XN_MASK_LINK, "Read thread buffer overflowed :(");
				XN_ASSERT(FALSE);
				return XN_STATUS_INTERNAL_BUFFER_TOO_SMALL;
			}

			nTotalBytesRead += nPacketBytesRead;
		}

		if (m_pDataDestination != NULL)	
		{
			//Send data in buffer to its destination.
			//Even if at this point the read thread should be stopped, first we send all the complete packets we got.
			if (nTotalBytesRead > 0)
			{
				m_pDataDestination->IncomingData(m_pBuffer, nTotalBytesRead);
			}
		}
	}

	nRetVal = xnOSCloseSocket(hSocket);
	if (nRetVal != XN_STATUS_OK)
	{
		xnLogWarning(XN_MASK_LINK, "Failed to close input data socket :(");
		XN_ASSERT(FALSE);
	}
	m_nConnectionStatus = XN_STATUS_OS_NETWORK_CONNECTION_CLOSED;

	return XN_STATUS_OK;	
}

XnStatus SocketInConnection::ReceivePacket(XN_SOCKET_HANDLE hSocket, void* pDestBuffer, XnUInt32& nSize, XnBool& bCanceled)
{
	XnStatus nRetVal = XN_STATUS_OK;
	LinkPacketHeader* pPacket = reinterpret_cast<LinkPacketHeader*>(pDestBuffer);

	XN_ASSERT(nSize >= sizeof(LinkPacketHeader));
	/* We first receive the packet's header to know its size, and then receive exactly as many bytes as needed.
	   If we just received max packet size, we might overrun a smaller packet and receive part of the next packet.
	   (We don't have this problem with USB cuz we always get a whole packet there).*/

	nRetVal = ReceiveExactly(hSocket, pPacket, sizeof(LinkPacketHeader), bCanceled);
	if (bCanceled)
	{
		//The request to receive a packet was canceled
		return XN_STATUS_OK;
	}
	//XN_IS_STATUS_OK_LOG_ERROR("Receive packet header", nRetVal);
	XN_IS_STATUS_OK(nRetVal);
	
	if (!pPacket->IsMagicValid())
	{
		xnLogError(XN_MASK_LINK, "Got bad link packet header magic :(");
		XN_ASSERT(FALSE);
		return XN_STATUS_ERROR;
	}
	XnUInt16 nPacketSize = pPacket->GetSize();
	if (nSize < nPacketSize)
	{
		xnLogError(XN_MASK_LINK, "Insufficient buffer (%u bytes) to hold packet of %u bytes", nSize, nPacketSize);
		XN_ASSERT(FALSE);
		return XN_STATUS_INTERNAL_BUFFER_TOO_SMALL;
	}
	nSize = 0; //In case we get canceled
	nRetVal = ReceiveExactly(hSocket, pPacket->GetPacketData(), nPacketSize - sizeof(LinkPacketHeader), bCanceled);
	XN_IS_STATUS_OK_LOG_ERROR("Receive packet body", nRetVal);
	if (bCanceled)
	{
		//The request to receive a packet was canceled
		return XN_STATUS_OK;
	}
	nSize = nPacketSize;
	
	return XN_STATUS_OK;	
}

XnStatus SocketInConnection::ReceiveExactly(XN_SOCKET_HANDLE hSocket, void* pDestBuffer, XnUInt32 nSize, XnBool& bCanceled)
{
	XnStatus nRetVal = XN_STATUS_OK;
	XnUInt32 nTotalBytesReceived = 0;
	XnUInt32 nIterationBytesReceived = 0;
	bCanceled = FALSE;
	while ((nTotalBytesReceived < nSize) && (!m_bStopReadThread))
	{
		nIterationBytesReceived = (nSize - nTotalBytesReceived);
		nRetVal = xnOSReceiveNetworkBuffer(hSocket, ((XnChar*)pDestBuffer) + nTotalBytesReceived, &nIterationBytesReceived, RECEIVE_TIMEOUT);
		if (nRetVal == XN_STATUS_OS_NETWORK_TIMEOUT)
		{
			//No data, no problem
			continue;
		}
		/*else if (nRetVal == XN_STATUS_OS_NETWORK_CONNECTION_CLOSED)
		{
			//This is ok - same as cancel
			break;
		}*/
		XN_IS_STATUS_OK(nRetVal);
		nTotalBytesReceived += nIterationBytesReceived;
	}

	if (nTotalBytesReceived < nSize)
	{
		//We didn't get all the data we expected - we were canceled.
		bCanceled = TRUE;
	}
	
	return nRetVal;
}

}