File: RTPInterface.cpp

package info (click to toggle)
liblivemedia 2006.03.17-2
  • links: PTS
  • area: main
  • in suites: etch, etch-m68k
  • size: 2,928 kB
  • ctags: 4,588
  • sloc: cpp: 35,064; ansic: 979; makefile: 78; sh: 73
file content (374 lines) | stat: -rw-r--r-- 12,962 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
/**********
This library is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by the
Free Software Foundation; either version 2.1 of the License, or (at your
option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)

This library is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
more details.

You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
**********/
// "liveMedia"
// Copyright (c) 1996-2005 Live Networks, Inc.  All rights reserved.
// An abstraction of a network interface used for RTP (or RTCP).
// (This allows the RTP-over-TCP hack (RFC 2326, section 10.12) to
// be implemented transparently.)
// Implementation

#include "RTPInterface.hh"
#include <GroupsockHelper.hh>
#include <stdio.h>

////////// Helper Functions - Definition //////////

// Helper routines and data structures, used to implement
// sending/receiving RTP/RTCP over a TCP socket:

static void sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
			   int socketNum, unsigned char streamChannelId);

// Reading RTP-over-TCP is implemented using two levels of hash tables.
// The top-level hash table maps TCP socket numbers to a
// "SocketDescriptor" that contains a hash table for each of the
// sub-channels that are reading from this socket.

static HashTable* socketHashTable(UsageEnvironment& env) {
  _Tables* ourTables = _Tables::getOurTables(env);
  if (ourTables->socketTable == NULL) {
    // Create a new socket number -> SocketDescriptor mapping table:
    ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);
  }
  return (HashTable*)(ourTables->socketTable);
}

class SocketDescriptor {
public:
  SocketDescriptor(UsageEnvironment& env, int socketNum);
  virtual ~SocketDescriptor();

  void registerRTPInterface(unsigned char streamChannelId,
			    RTPInterface* rtpInterface);
  RTPInterface* lookupRTPInterface(unsigned char streamChannelId);
  void deregisterRTPInterface(unsigned char streamChannelId);

private:
  static void tcpReadHandler(SocketDescriptor*, int mask);

private:
  UsageEnvironment& fEnv;
  int fOurSocketNum;
  HashTable* fSubChannelHashTable;
};

static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env,
						int sockNum) {
  char const* key = (char const*)(long)sockNum;
  return (SocketDescriptor*)(socketHashTable(env)->Lookup(key));
}

static void removeSocketDescription(UsageEnvironment& env, int sockNum) {
  char const* key = (char const*)(long)sockNum;
  HashTable* table = socketHashTable(env);
  table->Remove(key);

  if (table->IsEmpty()) {
    // We can also delete the table (to reclaim space):
    _Tables* ourTables = _Tables::getOurTables(env);
    delete table;
    ourTables->socketTable = NULL;
    ourTables->reclaimIfPossible();
  }
}


////////// RTPInterface - Implementation //////////

RTPInterface::RTPInterface(Medium* owner, Groupsock* gs)
  : fOwner(owner), fGS(gs),
    fTCPStreams(NULL),
    fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1),
    fReadHandlerProc(NULL),
    fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) {
}

RTPInterface::~RTPInterface() {
  delete fTCPStreams;
}

Boolean RTPOverTCP_OK = True; // HACK: For detecting TCP socket failure externally #####

void RTPInterface::setStreamSocket(int sockNum,
				   unsigned char streamChannelId) {
  fGS->removeAllDestinations();
  addStreamSocket(sockNum, streamChannelId);
}

void RTPInterface::addStreamSocket(int sockNum,
				   unsigned char streamChannelId) {
  if (sockNum < 0) return;
  else RTPOverTCP_OK = True; //##### HACK

  for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
       streams = streams->fNext) {
    if (streams->fStreamSocketNum == sockNum
	&& streams->fStreamChannelId == streamChannelId) {
      return; // we already have it
    }
  }

  fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);
}

void RTPInterface::removeStreamSocket(int sockNum,
				      unsigned char streamChannelId) {
  for (tcpStreamRecord** streamsPtr = &fTCPStreams; *streamsPtr != NULL;
       streamsPtr = &((*streamsPtr)->fNext)) {
    if ((*streamsPtr)->fStreamSocketNum == sockNum
	&& (*streamsPtr)->fStreamChannelId == streamChannelId) {
      // Remove the record pointed to by *streamsPtr :
      tcpStreamRecord* next = (*streamsPtr)->fNext;
      (*streamsPtr)->fNext = NULL;
      delete (*streamsPtr);
      *streamsPtr = next;
      return;
    }
  }
}

void RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
  // Normal case: Send as a UDP packet:
  fGS->output(envir(), fGS->ttl(), packet, packetSize);

  // Also, send over each of our TCP socket:
  for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
       streams = streams->fNext) {
    sendRTPOverTCP(packet, packetSize,
		   streams->fStreamSocketNum, streams->fStreamChannelId);
  }
}

void RTPInterface
::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
  // Normal case: Arrange to read UDP packets:
  envir().taskScheduler().
    turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);

  // Also, receive RTP over TCP, on each of our TCP connections:
  fReadHandlerProc = handlerProc;
  for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
       streams = streams->fNext) {
    // Get a socket descriptor for "streams->fStreamSocketNum":
    SocketDescriptor* socketDescriptor
      = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
    if (socketDescriptor == NULL) {
      socketDescriptor
	= new SocketDescriptor(envir(), streams->fStreamSocketNum);
      socketHashTable(envir())->Add((char const*)(long)(streams->fStreamSocketNum),
				    socketDescriptor);
    }

    // Tell it about our subChannel:
    socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
  }
}

Boolean RTPInterface::handleRead(unsigned char* buffer,
				 unsigned bufferMaxSize,
				 unsigned& bytesRead,
				 struct sockaddr_in& fromAddress) {
  Boolean readSuccess;
  if (fNextTCPReadStreamSocketNum < 0) {
    // Normal case: read from the (datagram) 'groupsock':
    readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);
  } else {
    // Read from the TCP connection:
    bytesRead = 0;
    unsigned totBytesToRead = fNextTCPReadSize;
    if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize; 
    unsigned curBytesToRead = totBytesToRead;
    unsigned curBytesRead;
    while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,
				      &buffer[bytesRead], curBytesToRead,
				      fromAddress)) > 0) {
      bytesRead += curBytesRead;
      if (bytesRead >= totBytesToRead) break;
      curBytesToRead -= curBytesRead;
    }
    if (curBytesRead <= 0) {
      bytesRead = 0;
      readSuccess = False;
      RTPOverTCP_OK = False; // HACK #####
    } else {
      readSuccess = True;
    }
    fNextTCPReadStreamSocketNum = -1; // default, for next time
  }

  if (readSuccess && fAuxReadHandlerFunc != NULL) {
    // Also pass the newly-read packet data to our auxilliary handler:
    (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);
  }
  return readSuccess;
}

void RTPInterface::stopNetworkReading() {
  // Normal case
  envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum());

  // Also turn off read handling on each of our TCP connections:
  for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
       streams = streams->fNext) {
    SocketDescriptor* socketDescriptor
      = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
    if (socketDescriptor != NULL) {
      socketDescriptor->deregisterRTPInterface(streams->fStreamChannelId);
        // Note: This may delete "socketDescriptor",
        // if no more interfaces are using this socket
    }
  }
}


////////// Helper Functions - Implementation /////////

void sendRTPOverTCP(unsigned char* packet, unsigned packetSize,
                    int socketNum, unsigned char streamChannelId) {
#ifdef DEBUG
  fprintf(stderr, "sendRTPOverTCP: %d bytes over channel %d (socket %d)\n",
	  packetSize, streamChannelId, socketNum); fflush(stderr);
#endif
  // Send RTP over TCP, using the encoding defined in
  // RFC 2326, section 10.12:
  do {
    char const dollar = '$';
    if (send(socketNum, &dollar, 1, 0) != 1) break;
    if (send(socketNum, (char*)&streamChannelId, 1, 0) != 1) break;

    char netPacketSize[2];
    netPacketSize[0] = (char) ((packetSize&0xFF00)>>8);
    netPacketSize[1] = (char) (packetSize&0xFF);
    if (send(socketNum, netPacketSize, 2, 0) != 2) break;

    if (send(socketNum, (char*)packet, packetSize, 0) != (int)packetSize) break;

#ifdef DEBUG
    fprintf(stderr, "sendRTPOverTCP: completed\n"); fflush(stderr);
#endif

    return;
  } while (0);

  RTPOverTCP_OK = False; // HACK #####
#ifdef DEBUG
  fprintf(stderr, "sendRTPOverTCP: failed!\n"); fflush(stderr);
#endif
}

SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum)
  : fEnv(env), fOurSocketNum(socketNum),
    fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)) {
}

SocketDescriptor::~SocketDescriptor() {
  delete fSubChannelHashTable;
}

void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId,
					    RTPInterface* rtpInterface) {
  Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty();
  fSubChannelHashTable->Add((char const*)(long)streamChannelId,
			    rtpInterface);

  if (isFirstRegistration) {
    // Arrange to handle reads on this TCP socket:
    TaskScheduler::BackgroundHandlerProc* handler
      = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler;
    fEnv.taskScheduler().
      turnOnBackgroundReadHandling(fOurSocketNum, handler, this);
  }
}

RTPInterface* SocketDescriptor
::lookupRTPInterface(unsigned char streamChannelId) {
  char const* lookupArg = (char const*)(long)streamChannelId;
  return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));
}

void SocketDescriptor
::deregisterRTPInterface(unsigned char streamChannelId) {
  fSubChannelHashTable->Remove((char const*)(long)streamChannelId);

  if (fSubChannelHashTable->IsEmpty()) {
    // No more interfaces are using us, so it's curtains for us now
    fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
    removeSocketDescription(fEnv, fOurSocketNum);
    delete this;
  }
}

void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor,
				      int mask) {
  do {
    UsageEnvironment& env = socketDescriptor->fEnv; // abbrev
    int socketNum = socketDescriptor->fOurSocketNum;

    // Begin by reading and discarding any characters that aren't '$'.
    // Any such characters are probably regular RTSP responses or
    // commands from the server.  At present, we can't do anything with
    // these, because we have taken complete control of reading this socket.
    // (Later, fix) #####
    unsigned char c;
    struct sockaddr_in fromAddress;
    do {
      if (readSocket(env, socketNum, &c, 1, fromAddress) != 1) { // error reading TCP socket
	env.taskScheduler().turnOffBackgroundReadHandling(socketNum); // stops further calls to us
	return;
      }
    } while (c != '$');

    // The next byte is the stream channel id:
    unsigned char streamChannelId;
    if (readSocket(env, socketNum, &streamChannelId, 1, fromAddress)
	!= 1) break;
    RTPInterface* rtpInterface
      = socketDescriptor->lookupRTPInterface(streamChannelId);
    if (rtpInterface == NULL) break; // we're not interested in this channel

    // The next two bytes are the RTP or RTCP packet size (in network order)
    unsigned short size;
    if (readSocketExact(env, socketNum, (unsigned char*)&size, 2,
			fromAddress) != 2) break;
    rtpInterface->fNextTCPReadSize = ntohs(size);
    rtpInterface->fNextTCPReadStreamSocketNum = socketNum;
#ifdef DEBUG
    fprintf(stderr, "SocketDescriptor::tcpReadHandler() reading %d bytes on channel %d\n", rtpInterface->fNextTCPReadSize, streamChannelId);
#endif

    // Now that we have the data set up, call this subchannel's
    // read handler:
    if (rtpInterface->fReadHandlerProc != NULL) {
      rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask);
    }

  } while (0);
}


////////// tcpStreamRecord implementation //////////

tcpStreamRecord
::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
		  tcpStreamRecord* next)
  : fNext(next),
    fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {
}

tcpStreamRecord::~tcpStreamRecord() {
  delete fNext;
}