File: PrioritizedRTPStreamSelector.cpp

package info (click to toggle)
liblivemedia 2005.04.01-1
  • links: PTS
  • area: main
  • in suites: sarge
  • size: 2,620 kB
  • ctags: 4,358
  • sloc: cpp: 33,542; ansic: 926; sh: 73; makefile: 62
file content (474 lines) | stat: -rw-r--r-- 15,737 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
/**********
This library is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by the
Free Software Foundation; either version 2.1 of the License, or (at your
option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)

This library is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
more details.

You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
**********/
// "liveMedia"
// Copyright (c) 1996-2004 Live Networks, Inc.  All rights reserved.
// Select from multiple, prioritized RTP streams, based on RTP sequence
// number, producing a single output stream
// Implementation

#include "PrioritizedRTPStreamSelector.hh"
#include "GroupsockHelper.hh"
#include <string.h>
#include <stdlib.h>

////////// PrioritizedInputStreamDescriptor //////////

// A data structure used to describe each input stream:
class PrioritizedInputStreamDescriptor {
public:
  PrioritizedInputStreamDescriptor(PrioritizedRTPStreamSelector*
				   ourSelector,
				   PrioritizedInputStreamDescriptor* next,
				   unsigned priority,
				   RTPSource* inputStream,
				   RTCPInstance* inputStreamRTCP);
  virtual ~PrioritizedInputStreamDescriptor();

  PrioritizedInputStreamDescriptor*& next() { return fNext; }
  unsigned priority() const { return fPriority; }
  RTPSource* rtpStream() const { return fRTPStream; }
  unsigned char*& buffer() { return fBuffer; }
  unsigned bufferSize() const { return fBufferSize; }

  void afterGettingFrame1(unsigned frameSize);
  void onSourceClosure1();

private:
  PrioritizedRTPStreamSelector* fOurSelector;
  PrioritizedInputStreamDescriptor* fNext;
  unsigned fPriority;
  RTPSource* fRTPStream;
  RTCPInstance* fRTCPStream;
  unsigned char* fBuffer; // where to put the next frame from this stream
  static unsigned const fBufferSize;
  unsigned fBufferBytesUsed;
};

static void afterGettingFrame(void* clientData, unsigned frameSize,
			      unsigned numTruncatedBytes,
			      struct timeval presentationTime,
			      unsigned durationInMicroseconds);
static void onSourceClosure(void* clientData);


////////// PacketWarehouse //////////

class WarehousedPacketDescriptor; // forward

class PacketWarehouse {
public:
  PacketWarehouse(unsigned seqNumStagger);
  virtual ~PacketWarehouse();

  Boolean isFull();
  void addNewFrame(unsigned priority, unsigned short rtpSeqNo,
		   unsigned char* buffer, unsigned frameSize);
  unsigned char* dequeueFrame(unsigned& resultFrameSize,
			      unsigned& uSecondsToDefer);

  Boolean fLastActionWasIncoming;
private:
  WarehousedPacketDescriptor* fPacketDescriptors;
  Boolean fHaveReceivedFrames;
  unsigned short fMinSeqNumStored, fMaxSeqNumStored;
  unsigned const fMinSpanForDelivery, fMaxSpanForDelivery, fNumDescriptors;
  struct timeval fLastArrivalTime; unsigned short fLastRTPSeqNo;
  unsigned fInterArrivalAveGap; // in microseconds
};

////////// PrioritizedRTPStreamSelector implementation //////////

PrioritizedRTPStreamSelector
::PrioritizedRTPStreamSelector(UsageEnvironment& env,
			       unsigned seqNumStagger)
  : FramedSource(env),
    fNextInputStreamPriority(0), fInputStreams(NULL),
    fAmCurrentlyReading(False), fNeedAFrame(False) {
  fWarehouse = new PacketWarehouse(seqNumStagger);
}

PrioritizedRTPStreamSelector::~PrioritizedRTPStreamSelector() {
  delete fWarehouse;

  while (fInputStreams != NULL) {
    PrioritizedInputStreamDescriptor* inputStream
      = fInputStreams;
    fInputStreams = inputStream->next();
    delete inputStream;
  }
}

PrioritizedRTPStreamSelector* PrioritizedRTPStreamSelector
::createNew(UsageEnvironment& env, unsigned seqNumStagger) {
  return new PrioritizedRTPStreamSelector(env, seqNumStagger);
}

unsigned PrioritizedRTPStreamSelector
::addInputRTPStream(RTPSource* inputStream,
		    RTCPInstance* inputStreamRTCP) {
  fInputStreams
    = new PrioritizedInputStreamDescriptor(this, fInputStreams,
					   fNextInputStreamPriority,
					   inputStream, inputStreamRTCP);
  return fNextInputStreamPriority++;
}

void PrioritizedRTPStreamSelector::removeInputRTPStream(unsigned priority) {
  for (PrioritizedInputStreamDescriptor*& inputStream
	 = fInputStreams;
       inputStream != NULL; inputStream = inputStream->next()) {
    if (inputStream->priority() == priority) {
      PrioritizedInputStreamDescriptor* toDelete
	= inputStream;
      inputStream->next() = toDelete->next();
      delete toDelete;
      break;
    }
  }
}

Boolean PrioritizedRTPStreamSelector
::lookupByName(UsageEnvironment& env,
	       char const* sourceName,
	       PrioritizedRTPStreamSelector*& resultSelector) {
  resultSelector = NULL; // unless we succeed

  FramedSource* source;
  if (!FramedSource::lookupByName(env, sourceName, source)) return False;

  if (!source->isPrioritizedRTPStreamSelector()) {
    env.setResultMsg(sourceName, " is not a Prioritized RTP Stream Selector");
    return False;
  }

  resultSelector = (PrioritizedRTPStreamSelector*)source;
  return True;
}

Boolean PrioritizedRTPStreamSelector
::isPrioritizedRTPStreamSelector() const {
  return True;
}

void PrioritizedRTPStreamSelector::doGetNextFrame() {
  // Begin the process of reading frames from our sources into the
  // frame 'warehouse' (unless this is already ongoing):
  startReadingProcess();

  // (Try to) give ourselves a frame from our warehouse:
  unsigned uSecondsToDefer;
  if (deliverFrameToClient(uSecondsToDefer)) {
    fNeedAFrame = False;

    // Complete the delivery.  If we were told to delay before doing
    // this, then schedule this as a delayed task:
    if (uSecondsToDefer > 0) {
      nextTask()
	= envir().taskScheduler().scheduleDelayedTask((int)uSecondsToDefer,
						      completeDelivery,
						      this);
    } else {
          completeDelivery(this);
    }
  } else {
    fNeedAFrame = True;
  }
}

void PrioritizedRTPStreamSelector::startReadingProcess() {
  if (fAmCurrentlyReading) return; // already ongoing
  if (fWarehouse->isFull()) return; // no room now for any more

  // Run through each input stream, requesting a new frame from it
  // (unless a previous read on this frame is still in progress).
  // When a frame arrives on one of the input streams, it will get
  // stored in our 'warehouse', for later delivery to us.
  for (PrioritizedInputStreamDescriptor* inputStream
	 = fInputStreams;
       inputStream != NULL; inputStream = inputStream->next()) {
    RTPSource* rtpStream = inputStream->rtpStream();
    if (!rtpStream->isCurrentlyAwaitingData()) {
      // Read a frame into this stream's descriptor's buffer:
      fAmCurrentlyReading = True;
      rtpStream->getNextFrame(inputStream->buffer(),
			      inputStream->bufferSize(),
			      afterGettingFrame, inputStream,
			      onSourceClosure, inputStream);
    }
  }
}

void PrioritizedRTPStreamSelector
::handleNewIncomingFrame(unsigned priority, unsigned short rtpSeqNo,
			 unsigned char* buffer, unsigned frameSize) {
  // Begin by adding this new frame to the warehouse:
  fWarehouse->addNewFrame(priority, rtpSeqNo, buffer, frameSize);
  fWarehouse->fLastActionWasIncoming = True;

  // Try again to deliver a frame for our client (if he still wants one):
  if (fNeedAFrame) {
    doGetNextFrame();
  }

  // Continue the reading process:
  fAmCurrentlyReading = False;
  startReadingProcess();
}

Boolean PrioritizedRTPStreamSelector
::deliverFrameToClient(unsigned& uSecondsToDefer) {
  unsigned char* buffer
    = fWarehouse->dequeueFrame(fFrameSize, uSecondsToDefer);

  if (buffer != NULL) {
    // A frame was available
    if (fFrameSize > fMaxSize) {
      fNumTruncatedBytes = fFrameSize - fMaxSize;
      fFrameSize = fMaxSize;
    }
    memmove(fTo, buffer, fFrameSize);

    delete[] buffer;
    fWarehouse->fLastActionWasIncoming = False;
    return True;
  }

  // No frame was available.
  return False;
}


void PrioritizedRTPStreamSelector::completeDelivery(void* clientData) {
  PrioritizedRTPStreamSelector* selector
    = (PrioritizedRTPStreamSelector*)clientData;

  // Call our own 'after getting' function.  Because we're not a 'leaf'
  // source, we can call this directly, without risking infinite recursion.
  FramedSource::afterGetting(selector);
}


//////// PrioritizedInputStreamDescriptor implementation ////////

unsigned const PrioritizedInputStreamDescriptor::fBufferSize = 4000;

PrioritizedInputStreamDescriptor
::PrioritizedInputStreamDescriptor(PrioritizedRTPStreamSelector*
				   ourSelector,
				   PrioritizedInputStreamDescriptor* next,
				   unsigned priority,
				   RTPSource* inputStream,
				   RTCPInstance* inputStreamRTCP)
    : fOurSelector(ourSelector), fNext(next), fPriority(priority),
      fRTPStream(inputStream), fRTCPStream(inputStreamRTCP) {
  fBuffer = new unsigned char[fBufferSize];
  fBufferBytesUsed = 0;
}

PrioritizedInputStreamDescriptor::~PrioritizedInputStreamDescriptor() {
  delete[] fBuffer;
}

static void afterGettingFrame(void* clientData, unsigned frameSize,
			      unsigned /*numTruncatedBytes*/,
			      struct timeval /*presentationTime*/,
			      unsigned /*durationInMicroseconds*/) {
  PrioritizedInputStreamDescriptor* inputStream
    = (PrioritizedInputStreamDescriptor*)clientData;
  inputStream->afterGettingFrame1(frameSize);
}

void PrioritizedInputStreamDescriptor
::afterGettingFrame1(unsigned frameSize) {
  unsigned short rtpSeqNo = rtpStream()->curPacketRTPSeqNum();
  // Deliver this frame to our selector:
  fOurSelector->handleNewIncomingFrame(fPriority, rtpSeqNo,
				       fBuffer, frameSize);
}

static void onSourceClosure(void* clientData) {
  PrioritizedInputStreamDescriptor* inputStream
    = (PrioritizedInputStreamDescriptor*)clientData;
  inputStream->onSourceClosure1();
}

void PrioritizedInputStreamDescriptor::onSourceClosure1() {
  fOurSelector->removeInputRTPStream(fPriority);
}


////////// WarehousedPacketDescriptor //////////

class WarehousedPacketDescriptor {
public:
  WarehousedPacketDescriptor() : buffer(NULL) {}
  // Don't define a destructor; for some reason it causes a crash #####

  unsigned priority;
  unsigned frameSize;
  unsigned char* buffer;
};


////////// PacketWarehouse implementation 

PacketWarehouse::PacketWarehouse(unsigned seqNumStagger)
  : fLastActionWasIncoming(False),
    fHaveReceivedFrames(False), fMinSeqNumStored(0), fMaxSeqNumStored(0),
    fMinSpanForDelivery((unsigned)(1.5*seqNumStagger)),
    fMaxSpanForDelivery(3*seqNumStagger),
    fNumDescriptors(4*seqNumStagger),
    fInterArrivalAveGap(0) {
  fPacketDescriptors = new WarehousedPacketDescriptor[fNumDescriptors];
  if (fPacketDescriptors == NULL) {
#ifdef DEBUG
    fprintf(stderr, "PacketWarehouse failed to allocate %d descriptors; seqNumStagger too large!\n", fNumDescriptors);
#endif
    exit(1);
  }

  // Initially, set "fLastArrivalTime" to the current time:
  gettimeofday(&fLastArrivalTime, NULL);
}

PacketWarehouse::~PacketWarehouse() {
  // Delete each descriptor's buffer (if any), then delete the descriptors:
  for (unsigned i = 0; i < fNumDescriptors; ++i) {
    delete[] fPacketDescriptors[i].buffer;
  }
  delete[] fPacketDescriptors;
}

Boolean PacketWarehouse::isFull() {
  int currentSpan = fMaxSeqNumStored - fMinSeqNumStored;
  if (currentSpan < 0) currentSpan += 65536;

  return (unsigned)currentSpan >= fNumDescriptors;
}

void PacketWarehouse::addNewFrame(unsigned priority,
				  unsigned short rtpSeqNo,
				  unsigned char* buffer,
				  unsigned frameSize) {
  if (!fHaveReceivedFrames) {
    // This is our first frame; set up initial parameters:
    // (But note hack: We want the first frame to have priority 0, so that
    //  receivers' decoders are happy, by seeing the 'best' data initially)
    if (priority != 0) return;

    fMinSeqNumStored = fMaxSeqNumStored = rtpSeqNo;
    fHaveReceivedFrames = True;
  } else {
    // Update our record of the maximum sequence number stored
    if (seqNumLT(fMaxSeqNumStored, rtpSeqNo)) {
      fMaxSeqNumStored = rtpSeqNo;
    } else if (seqNumLT(rtpSeqNo, fMinSeqNumStored)) {
      return; // ignore this packet; it's too old for us
    }
  }

  if (isFull()) {
    // We've gotten way ahead of ourselves, probably due to a very large
    // set of consecutive lost packets.  To recover, reset our min/max
    // sequence numbers (effectively emptying the warehouse).  Hopefully,
    // this should be an unusual occurrence:
    fMinSeqNumStored = fMaxSeqNumStored = rtpSeqNo;
  }

  // Check whether a frame with this sequence number has already been seen
  WarehousedPacketDescriptor& desc
    = fPacketDescriptors[rtpSeqNo%fNumDescriptors];
  if (desc.buffer != NULL) {
    // We already have a frame.  If it's priority is higher than that of
    // this new frame, then we continue to use it:
    if (desc.priority < priority) return; // lower than existing priority

    // Otherwise, use the new frame instead, so get rid of the existing one:
    delete[] desc.buffer;
  }

  // Record this new frame:
  desc.buffer = new unsigned char[frameSize];
  if (desc.buffer == NULL) {
#ifdef DEBUG
    fprintf(stderr, "PacketWarehouse::addNewFrame failed to allocate %d-byte buffer!\n", frameSize);
#endif
    exit(1);
  }
  memmove(desc.buffer, buffer, frameSize);
  desc.frameSize = frameSize;
  desc.priority = priority;

  struct timeval timeNow;
  gettimeofday(&timeNow, NULL);

  if (rtpSeqNo == (fLastRTPSeqNo+1)%65536) {
    // We've received consecutive packets, so update the estimate of
    // the average inter-arrival gap:
    unsigned lastGap // in microseconds
      = (timeNow.tv_sec - fLastArrivalTime.tv_sec)*1000000
      + (timeNow.tv_usec - fLastArrivalTime.tv_usec);

    fInterArrivalAveGap = (9*fInterArrivalAveGap + lastGap)/10; // weighted
  }
  fLastArrivalTime = timeNow;
  fLastRTPSeqNo = rtpSeqNo;
}

unsigned char* PacketWarehouse::dequeueFrame(unsigned& resultFrameSize,
					     unsigned& uSecondsToDefer) {
  uSecondsToDefer = 0; // by default

  // Don't return anything if we don't yet have enough packets to
  // cover our desired sequence number span
  int currentSpan = fMaxSeqNumStored - fMinSeqNumStored;
  if (currentSpan < 0) currentSpan += 65536;
  if (currentSpan < (int)fMinSpanForDelivery) {
    return NULL; // we're not ready
  }

  // Now, if our stored packet range is less than the desired maximum,
  // return a frame, but delay this unless no incoming frames have
  // arrived since the last time a frame was delivered to our client.
  // This causes the buffer to empty only if the flow of incoming
  // frames stops.
  if (currentSpan < (int)fMaxSpanForDelivery) {
    if (fLastActionWasIncoming) {
      uSecondsToDefer = (unsigned)(1.5*fInterArrivalAveGap);
    }
  }

  // Now, return a frame:
  unsigned char* resultBuffer = NULL;
  do {
    if (currentSpan < (int)fMinSpanForDelivery) break;

    WarehousedPacketDescriptor& desc
      = fPacketDescriptors[fMinSeqNumStored%fNumDescriptors];
    resultBuffer = desc.buffer;
    resultFrameSize = desc.frameSize;

    desc.buffer = NULL;
#ifdef DEBUG
    if (resultBuffer == NULL) fprintf(stderr, "No packet for seq num %d - skipping\n", fMinSeqNumStored); //#####
  else if (desc.priority == 2) fprintf(stderr, "Using priority %d frame for seq num %d\n", desc.priority, fMinSeqNumStored);//#####
#endif
    fMinSeqNumStored = (fMinSeqNumStored+1)%65536;
    --currentSpan;
  } while (resultBuffer == NULL); // skip over missing packets

  return resultBuffer;
}