1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
package EDU.oswego.cs.dl.util.concurrent.misc;
import EDU.oswego.cs.dl.util.concurrent.*;
public class CVBuffer implements BoundedChannel {
private final Mutex mutex;
private final CondVar notFull;
private final CondVar notEmpty;
private int count = 0;
private int takePtr = 0;
private int putPtr = 0;
private final Object[] array;
public CVBuffer(int cap) {
array = new Object[cap];
mutex = new Mutex();
notFull = new CondVar(mutex);
notEmpty = new CondVar(mutex);
}
public CVBuffer() {
this(DefaultChannelCapacity.get());
}
public int capacity() { return array.length; }
public void put(Object x) throws InterruptedException {
mutex.acquire();
try {
while (count == array.length) {
notFull.await();
}
array[putPtr] = x;
putPtr = (putPtr + 1) % array.length;
++count;
notEmpty.signal();
}
finally {
mutex.release();
}
}
public Object take() throws InterruptedException {
Object x = null;
mutex.acquire();
try {
while (count == 0) {
notEmpty.await();
}
x = array[takePtr];
array[takePtr] = null;
takePtr = (takePtr + 1) % array.length;
--count;
notFull.signal();
}
finally {
mutex.release();
}
return x;
}
public boolean offer(Object x, long msecs) throws InterruptedException {
mutex.acquire();
try {
if (count == array.length) {
notFull.timedwait(msecs);
if (count == array.length)
return false;
}
array[putPtr] = x;
putPtr = (putPtr + 1) % array.length;
++count;
notEmpty.signal();
return true;
}
finally {
mutex.release();
}
}
public Object poll(long msecs) throws InterruptedException {
Object x = null;
mutex.acquire();
try {
if (count == 0) {
notEmpty.timedwait(msecs);
if (count == 0)
return null;
}
x = array[takePtr];
array[takePtr] = null;
takePtr = (takePtr + 1) % array.length;
--count;
notFull.signal();
}
finally {
mutex.release();
}
return x;
}
public Object peek() {
try {
mutex.acquire();
try {
if (count == 0)
return null;
else
return array[takePtr];
}
finally {
mutex.release();
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return null;
}
}
}
|