1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
#include "stdafx.h"
#include "Pipe.h"
namespace storm {
Pipe::Pipe() {
init(1024 * 4);
}
Pipe::Pipe(Nat size) {
init(size);
}
void Pipe::init(Nat size) {
data = runtime::allocArray<Byte>(engine(), &byteArrayType, size);
readPos = 0;
filled = 0;
readClosed = false;
writeClosed = false;
lock = new (this) Lock();
waitRead = new (this) Event();
waitWrite = new (this) Event();
waitWrite->set();
waitRead->clear();
}
IStream *Pipe::input() {
return new (this) PipeIStream(this);
}
OStream *Pipe::output() {
return new (this) PipeOStream(this);
}
void Pipe::close() {
closeRead();
closeWrite();
}
void Pipe::closeRead() {
readClosed = true;
waitWrite->set();
}
void Pipe::closeWrite() {
writeClosed = true;
waitRead->set();
}
Bool Pipe::more() {
Lock::Guard z(lock);
return !writeClosed;
}
void Pipe::read(Buffer to) {
readCommon(to, true);
}
void Pipe::peek(Buffer to) {
readCommon(to, false);
}
void Pipe::readCommon(Buffer to, Bool updatePos) {
while (true) {
// Wait until we can read...
waitRead->wait();
Lock::Guard z(lock);
// Spurious wakeup or closed?
if (filled <= 0) {
if (writeClosed)
return;
// Make sure we wait.
waitRead->clear();
continue;
}
Nat toRead = min(to.free(), filled);
if (readPos + toRead > data->count) {
// Need two memcpy.
Nat first = Nat(data->count) - readPos;
memcpy(to.dataPtr() + to.filled(), data->v + readPos, first);
memcpy(to.dataPtr() + to.filled() + first, data->v, toRead - first);
} else {
// One is enough.
memcpy(to.dataPtr() + to.filled(), data->v + readPos, toRead);
}
to.filled(to.filled() + toRead);
if (updatePos) {
filled -= toRead;
readPos += toRead;
if (readPos >= data->count)
readPos -= Nat(data->count);
// Now we can write more data!
waitWrite->set();
// Maybe we can't read anymore.
if (filled == 0)
waitRead->clear();
}
// Done!
return;
}
}
Nat Pipe::write(Buffer from, Nat start) {
Nat consumed = 0;
while (from.filled() > start) {
waitWrite->wait();
Lock::Guard z(lock);
// If the read end is closed, just stop writing.
if (readClosed)
return consumed;
// Any room to write? If not, wait a bit more. It could be a spurious wakeup.
if (filled >= data->count) {
// Make sure we wait.
waitWrite->clear();
continue;
}
Nat toWrite = min(Nat(data->count) - filled, from.filled() - start);
Nat tail = readPos + filled;
if (tail >= data->count)
tail -= Nat(data->count);
// Take wrapping into account.
toWrite = min(Nat(data->count) - tail, toWrite);
memcpy(data->v + tail, from.dataPtr() + start, toWrite);
filled += toWrite;
start += toWrite;
consumed += toWrite;
// Update our state:
if (filled >= data->count)
waitWrite->clear();
// It is possible to read now, we put data there.
waitRead->set();
}
return consumed;
}
}
|