1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
/*
* Worldvisions Weaver Software:
* Copyright (C) 2002 Net Integration Technologies, Inc.
*
* WvSyncStream throttles its input to the specified bitrate.
* It only becomes readable at periodic intervals.
*/
#include "wvsyncstream.h"
#include "wvtimeutils.h"
WvSyncStream::WvSyncStream(WvStream *_cloned, size_t _bps,
size_t _avgchunk, size_t _maxchunk) :
WvStreamClone(_cloned)
{
init(_bps, _avgchunk, _maxchunk);
}
WvSyncStream::WvSyncStream(WvStream *_cloned, bool _owner, int _srate,
int _bits, int _msec) : WvStreamClone(_cloned)
{
size_t _bps = _srate * _bits / 8;
size_t _avgchunk = _bps * _msec / 1000;
init(_bps, _avgchunk, _avgchunk * 5); // arbitrary choice
disassociate_on_close = ! _owner;
}
WvSyncStream::~WvSyncStream()
{
close();
}
void WvSyncStream::init(size_t _bps, size_t _avgchunk, size_t _maxchunk)
{
bps = _bps;
avgchunk = _avgchunk;
maxchunk = _maxchunk;
// allow +/- 50% tolerance
// FIXME: this is a purely arbitrary number
int tol = avgchunk / 2;
lowater = avgchunk - tol;
hiwater = avgchunk + tol;
waiting = false;
resettimer();
force_select(true, false, false);
}
size_t WvSyncStream::uread(void *buf, size_t count)
{
poll();
if (count > availchunk)
count = availchunk;
if (availchunk == 0)
return 0; // try again later
size_t len = WvStreamClone::uread(buf, count);
availchunk -= len;
usedchunk += len;
return len;
}
bool WvSyncStream::pre_select(SelectInfo &si)
{
if (waiting)
{
poll();
if (availchunk < lowater)
{
time_t timeout = (hiwater - availchunk) * 1000 / bps + 1;
if (timeout > 0)
{
if (timeout < si.msec_timeout || si.msec_timeout < 0)
si.msec_timeout = timeout;
return false;
}
}
waiting = false;
return true; // we know we had data
}
return WvStreamClone::pre_select(si);
}
bool WvSyncStream::post_select(SelectInfo &si)
{
bool havedata = WvStreamClone::post_select(si);
if (havedata && si.wants.readable)
{
poll();
if (availchunk < lowater)
{
// not enough data to care about right now
waiting = true;
return false;
}
}
return havedata;
}
void WvSyncStream::poll()
{
// we advance the reference time periodically size to avoid
// overflows during some integer calculations
const int WINDOW = 10;
// how long has it been since we started
struct timeval now;
gettimeofday(& now, NULL);
time_t msec = msecdiff(now, reference);
if (msec > WINDOW * 1000 * 2)
{
// avoid overflow by adjusting reference time
reference.tv_sec += WINDOW;
msec -= WINDOW * 1000;
size_t consume = WINDOW * bps;
if (usedchunk >= consume)
usedchunk -= consume;
else
usedchunk = 0; // got very far behind reading?
}
else if (msec < 0)
{
// reference clock is confused!
resettimer();
return;
}
// how much can we read?
size_t totalchunk = bps * msec / 1000;
availchunk = totalchunk - usedchunk;
if (availchunk > maxchunk)
{
// resynchronize after a long delay
availchunk = maxchunk;
usedchunk = totalchunk - availchunk;
}
}
void WvSyncStream::resettimer()
{
// make full chunk available immediately
gettimeofday(& reference, NULL);
reference.tv_sec -= 1;
availchunk = hiwater;
usedchunk = bps - availchunk;
}
|