1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
|
/*
* Worldvisions Tunnel Vision Software:
* Copyright (C) 1997-2002 Net Integration Technologies, Inc.
*
* WvEncoderStream chains a series of encoders on the input and
* output ports of the underlying stream to effect on-the-fly data
* transformations.
*/
#include "wvencoderstream.h"
WvEncoderStream::WvEncoderStream(WvStream *_cloned) : WvStreamClone(_cloned)
{
is_closing = false;
is_eof = false;
min_readsize = 0;
}
WvEncoderStream::~WvEncoderStream()
{
close();
}
void WvEncoderStream::close()
{
// we want to finish the encoders even if !isok() since we
// might just have encountered an EOF condition, and we want
// to ensure that the remaining data is processed, but this
// might cause recursion if the encoders set a new error condition
if (is_closing) return;
is_closing = true;
// finish encoders
finish_read();
finish_write();
// flush write chain and close the stream
WvStreamClone::close();
}
bool WvEncoderStream::isok() const
{
// handle encoder error conditions
if (! WvStream::isok())
return false;
// handle substream error conditions
// we don't check substream isok() because that is handled
// during read operations to distinguish EOF from errors
if (! cloned || cloned->geterr() != 0)
return false;
// handle deferred EOF condition
return ! is_eof;
}
bool WvEncoderStream::flush_internal(time_t msec_timeout)
{
flush_write();
// flush underlying stream
while (isok() && writeoutbuf.used())
{
WvEncoderStream::flush(msec_timeout);
if (!msec_timeout || !select(msec_timeout, false, true))
{
if (msec_timeout >= 0)
break;
}
}
return !writeoutbuf.used();
}
bool WvEncoderStream::flush_read()
{
bool success = readchain.flush(readinbuf, readoutbuf);
checkreadisok();
inbuf.merge(readoutbuf);
return success;
}
bool WvEncoderStream::flush_write()
{
bool success = push(true /*flush*/, false /*finish*/);
return success;
}
bool WvEncoderStream::finish_read()
{
bool success = readchain.flush(readinbuf, readoutbuf);
if (! readchain.finish(readoutbuf))
success = false;
checkreadisok();
inbuf.merge(readoutbuf);
is_eof = true;
return success;
}
bool WvEncoderStream::finish_write()
{
return push(true /*flush*/, true /*finish*/);
}
void WvEncoderStream::pull(size_t size)
{
if (is_eof)
return;
// pull a chunk of unencoded input
bool finish = false;
if (! readchain.isfinished() && cloned)
{
if (size != 0)
cloned->read(readinbuf, size);
if (! cloned->isok())
finish = true; // underlying stream hit EOF or error
}
// encode the input
readchain.encode(readinbuf, readoutbuf, finish /* flush*/);
if (finish)
{
readchain.finish(readoutbuf);
if (readoutbuf.used() == 0 && inbuf.used() == 0)
is_eof = true;
// otherwise defer EOF until the buffered data has been read
}
else if (readoutbuf.used() == 0 && inbuf.used() == 0 && readchain.isfinished())
{
// only get EOF when the chain is finished and we have no
// more data
is_eof = true;
}
checkreadisok();
}
bool WvEncoderStream::push(bool flush, bool finish)
{
// encode the output
if (flush)
writeinbuf.merge(outbuf);
bool success = writechain.encode(writeinbuf, writeoutbuf, flush);
if (finish)
if (! writechain.finish(writeoutbuf))
success = false;
checkwriteisok();
// push encoded output to cloned stream
size_t size = writeoutbuf.used();
if (size != 0)
{
const unsigned char *writeout = writeoutbuf.get(size);
size_t len = WvStreamClone::uwrite(writeout, size);
writeoutbuf.unget(size - len);
}
return success;
}
size_t WvEncoderStream::uread(void *buf, size_t size)
{
if (size && readoutbuf.used() == 0)
{
pull(min_readsize > size ? min_readsize : size);
}
size_t avail = readoutbuf.used();
if (size > avail)
size = avail;
readoutbuf.move(buf, size);
return size;
}
size_t WvEncoderStream::uwrite(const void *buf, size_t size)
{
writeinbuf.put(buf, size);
push(false /*flush*/, false /*finish*/);
return size;
}
bool WvEncoderStream::pre_select(SelectInfo &si)
{
bool surething = false;
// if we have buffered input data and we want to check for
// readability, then cause a callback to occur that will
// hopefully ask us for more data via uread()
if (si.wants.readable)
{
pull(0); // try an encode
if (readoutbuf.used() != 0)
surething = true;
}
// try to push pending encoded output to cloned stream
// outbuf_delayed_flush condition already handled by uwrite()
push(false /*flush*/, false /*finish*/);
// consult the underlying stream
if (WvStreamClone::pre_select(si))
surething = true;
return surething;
}
void WvEncoderStream::checkreadisok()
{
if (! readchain.isok())
{
seterr(WvString("read chain: %s", readchain.geterror()));
is_eof = true;
}
}
void WvEncoderStream::checkwriteisok()
{
if (! writechain.isok())
seterr(WvString("write chain: %s", writechain.geterror()));
}
|