
|
/*
* Worldvisions Tunnel Vision Software:
* Copyright (C) 1997-2002 Net Integration Technologies, Inc.
*
* WvEncoderStream chains a series of encoders on the input and
* output ports of the underlying stream to effect on-the-fly data
* transformations.
*/
#include "wvencoderstream.h"
WvEncoderStream::WvEncoderStream(WvStream *_cloned) : WvStreamClone(_cloned)
{
is_closing = false;
min_readsize = 0;
}
WvEncoderStream::~WvEncoderStream()
{
close();
}
void WvEncoderStream::close()
{
// fprintf(stderr, "Encoderstream close!\n");
// we want to finish the encoders even if !isok() since we
// might just have encountered an EOF condition, and we want
// to ensure that the remaining data is processed, but this
// might cause recursion if the encoders set a new error condition
if (is_closing) return;
is_closing = true;
// finish encoders
finish_read();
finish_write();
// flush write chain and close the stream
WvStreamClone::close();
}
bool WvEncoderStream::isok() const
{
//fprintf(stderr, "encoderstream isok: %d %p %d %d\n",
// WvStream::isok(), cloned, cloned->isok(), cloned->geterr());
// handle encoder error conditions
if (!WvStream::isok())
return false;
// handle substream error conditions
// we don't check substream isok() because that is handled
// during read operations to distinguish EOF from errors
if (!cloned || cloned->geterr() != 0)
return false;
return true;
}
bool WvEncoderStream::flush_internal(time_t msec_timeout)
{
flush_write();
return WvStreamClone::flush_internal(msec_timeout);
}
bool WvEncoderStream::flush_read()
{
bool success = readchain.flush(readinbuf, readoutbuf);
checkreadisok();
inbuf.merge(readoutbuf);
return success;
}
bool WvEncoderStream::flush_write()
{
bool success = push(true /*flush*/, false /*finish*/);
return success;
}
bool WvEncoderStream::finish_read()
{
bool success = readchain.flush(readinbuf, readoutbuf);
if (!readchain.finish(readoutbuf))
success = false;
checkreadisok();
inbuf.merge(readoutbuf);
// noread();
return success;
}
bool WvEncoderStream::finish_write()
{
return push(true /*flush*/, true /*finish*/);
}
void WvEncoderStream::pull(size_t size)
{
// fprintf(stderr, "encoder pull %d\n", size);
// pull a chunk of unencoded input
bool finish = false;
if (cloned)
{
if (size != 0)
cloned->read(readinbuf, size);
if (!cloned->isok())
finish = true; // underlying stream hit EOF or error
}
// deal with any encoders that have been added recently
WvDynBuf tmpbuf;
tmpbuf.merge(readoutbuf);
readchain.continue_encode(tmpbuf, readoutbuf);
// apenwarr 2004/11/06: always flush on read, because otherwise there's
// no clear way to decide when we need to flush. Anyway, most "decoders"
// (the kind of thing you'd put in the readchain) don't care whether you
// flush or not.
readchain.encode(readinbuf, readoutbuf, true);
//readchain.encode(readinbuf, readoutbuf, finish /*flush*/);
if (finish)
{
readchain.finish(readoutbuf);
// if (readoutbuf.used() == 0 && inbuf.used() == 0)
// noread();
close();
// otherwise defer EOF until the buffered data has been read
}
else if (!readoutbuf.used() && !inbuf.used() && readchain.isfinished())
{
// only get EOF when the chain is finished and we have no
// more data
//noread();
close();
}
checkreadisok();
}
bool WvEncoderStream::push(bool flush, bool finish)
{
WvDynBuf writeoutbuf;
// encode the output
if (flush)
writeinbuf.merge(outbuf);
bool success = writechain.encode(writeinbuf, writeoutbuf, flush);
if (finish)
if (!writechain.finish(writeoutbuf))
success = false;
checkwriteisok();
#if 0
// push encoded output to cloned stream
size_t size = writeoutbuf.used();
if (size != 0)
{
const unsigned char *writeout = writeoutbuf.get(size);
size_t len = WvStreamClone::uwrite(writeout, size);
writeoutbuf.unget(size - len);
}
#endif
if (cloned)
cloned->write(writeoutbuf, writeoutbuf.used());
return success;
}
size_t WvEncoderStream::uread(void *buf, size_t size)
{
// fprintf(stderr, "encstream::uread(%d)\n", size);
if (size && readoutbuf.used() == 0)
pull(min_readsize > size ? min_readsize : size);
size_t avail = readoutbuf.used();
if (size > avail)
size = avail;
readoutbuf.move(buf, size);
return size;
}
size_t WvEncoderStream::uwrite(const void *buf, size_t size)
{
writeinbuf.put(buf, size);
push(false /*flush*/, false /*finish*/);
return size;
}
void WvEncoderStream::pre_select(SelectInfo &si)
{
WvStreamClone::pre_select(si);
if (si.wants.readable && readoutbuf.used() != 0)
si.msec_timeout = 0;
}
bool WvEncoderStream::post_select(SelectInfo &si)
{
bool sure = false;
// if we have buffered input data and we want to check for
// readability, then cause a callback to occur that will
// hopefully ask us for more data via uread()
if (si.wants.readable && readoutbuf.used() != 0)
{
pull(0); // try an encode
if (readoutbuf.used() != 0)
sure = true;
}
// try to push pending encoded output to cloned stream
// outbuf_delayed_flush condition already handled by uwrite()
push(false /*flush*/, false /*finish*/);
// consult the underlying stream
sure |= WvStreamClone::post_select(si);
return sure;
}
void WvEncoderStream::checkreadisok()
{
if (!readchain.isok())
{
seterr(WvString("read chain: %s", readchain.geterror()));
noread();
}
}
void WvEncoderStream::checkwriteisok()
{
if (!writechain.isok())
seterr(WvString("write chain: %s", writechain.geterror()));
}
|