1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
|
/*
File: Rendezvous.java
Originally written by Doug Lea and released into the public domain.
This may be used for any purposes whatsoever without acknowledgment.
Thanks for the assistance and support of Sun Microsystems Labs,
and everyone contributing, testing, and using this code.
History:
Date Who What
11Jun1998 dl Create public version
30Jul1998 dl Minor code simplifications
*/
package EDU.oswego.cs.dl.util.concurrent;
/**
* A rendezvous is a barrier that:
* <ul>
* <li> Unlike a CyclicBarrier, is not restricted to use
* with fixed-sized groups of threads.
* Any number of threads can attempt to enter a rendezvous,
* but only the predetermined number of parties enter
* and later become released from the rendezvous at any give time.
* <li> Enables each participating thread to exchange information
* with others at the rendezvous point. Each entering thread
* presents some object on entry to the rendezvous, and
* returns some object on release. The object returned is
* the result of a RendezvousFunction that is run once per
* rendezvous, (it is run by the last-entering thread). By
* default, the function applied is a rotation, so each
* thread returns the object given by the next (modulo parties)
* entering thread. This default function faciliates simple
* application of a common use of rendezvous, as exchangers.
* </ul>
* <p>
* Rendezvous use an all-or-none breakage model
* for failed synchronization attempts: If threads
* leave a rendezvous point prematurely because of timeout
* or interruption, others will also leave abnormally
* (via BrokenBarrierException), until
* the rendezvous is <code>restart</code>ed. This is usually
* the simplest and best strategy for sharing knowledge
* about failures among cooperating threads in the most
* common usages contexts of Rendezvous.
* <p>
* While any positive number (including 1) of parties can
* be handled, the most common case is to have two parties.
* <p>
* <b>Sample Usage</b><p>
* Here are the highlights of a class that uses a Rendezvous to
* swap buffers between threads so that the thread filling the
* buffer gets a freshly
* emptied one when it needs it, handing off the filled one to
* the thread emptying the buffer.
* <pre>
* class FillAndEmpty {
* Rendezvous exchanger = new Rendezvous(2);
* Buffer initialEmptyBuffer = ... a made-up type
* Buffer initialFullBuffer = ...
*
* class FillingLoop implements Runnable {
* public void run() {
* Buffer currentBuffer = initialEmptyBuffer;
* try {
* while (currentBuffer != null) {
* addToBuffer(currentBuffer);
* if (currentBuffer.full())
* currentBuffer = (Buffer)(exchanger.rendezvous(currentBuffer));
* }
* }
* catch (BrokenBarrierException ex) {
* return;
* }
* catch (InterruptedException ex) {
* Thread.currentThread().interrupt();
* }
* }
* }
*
* class EmptyingLoop implements Runnable {
* public void run() {
* Buffer currentBuffer = initialFullBuffer;
* try {
* while (currentBuffer != null) {
* takeFromBuffer(currentBuffer);
* if (currentBuffer.empty())
* currentBuffer = (Buffer)(exchanger.rendezvous(currentBuffer));
* }
* }
* catch (BrokenBarrierException ex) {
* return;
* }
* catch (InterruptedException ex) {
* Thread.currentThread().interrupt();
* }
* }
* }
*
* void start() {
* new Thread(new FillingLoop()).start();
* new Thread(new EmptyingLoop()).start();
* }
* }
* </pre>
* <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
**/
public class Rendezvous implements Barrier {
/**
* Interface for functions run at rendezvous points
**/
public interface RendezvousFunction {
/**
* Perform some function on the objects presented at
* a rendezvous. The objects array holds all presented
* items; one per thread. Its length is the number of parties.
* The array is ordered by arrival into the rendezvous.
* So, the last element (at objects[objects.length-1])
* is guaranteed to have been presented by the thread performing
* this function. No identifying information is
* otherwise kept about which thread presented which item.
* If you need to
* trace origins, you will need to use an item type for rendezvous
* that includes identifying information. After return of this
* function, other threads are released, and each returns with
* the item with the same index as the one it presented.
**/
public void rendezvousFunction(Object[] objects);
}
/**
* The default rendezvous function. Rotates the array
* so that each thread returns an item presented by some
* other thread (or itself, if parties is 1).
**/
public static class Rotator implements RendezvousFunction {
/** Rotate the array **/
public void rendezvousFunction(Object[] objects) {
int lastIdx = objects.length - 1;
Object first = objects[0];
for (int i = 0; i < lastIdx; ++i) objects[i] = objects[i+1];
objects[lastIdx] = first;
}
}
protected final int parties_;
protected boolean broken_ = false;
/**
* Number of threads that have entered rendezvous
**/
protected int entries_ = 0;
/**
* Number of threads that are permitted to depart rendezvous
**/
protected long departures_ = 0;
/**
* Incoming threads pile up on entry until last set done.
**/
protected final Semaphore entryGate_;
/**
* Temporary holder for items in exchange
**/
protected final Object[] slots_;
/**
* The function to run at rendezvous point
**/
protected RendezvousFunction rendezvousFunction_;
/**
* Create a Barrier for the indicated number of parties,
* and the default Rotator function to run at each barrier point.
* @exception IllegalArgumentException if parties less than or equal to zero.
**/
public Rendezvous(int parties) {
this(parties, new Rotator());
}
/**
* Create a Barrier for the indicated number of parties.
* and the given function to run at each barrier point.
* @exception IllegalArgumentException if parties less than or equal to zero.
**/
public Rendezvous(int parties, RendezvousFunction function) {
if (parties <= 0) throw new IllegalArgumentException();
parties_ = parties;
rendezvousFunction_ = function;
entryGate_ = new WaiterPreferenceSemaphore(parties);
slots_ = new Object[parties];
}
/**
* Set the function to call at the point at which all threads reach the
* rendezvous. This function is run exactly once, by the thread
* that trips the barrier. The function is not run if the barrier is
* broken.
* @param function the function to run. If null, no function is run.
* @return the previous function
**/
public synchronized RendezvousFunction setRendezvousFunction(RendezvousFunction function) {
RendezvousFunction old = rendezvousFunction_;
rendezvousFunction_ = function;
return old;
}
public int parties() { return parties_; }
public synchronized boolean broken() { return broken_; }
/**
* Reset to initial state. Clears both the broken status
* and any record of waiting threads, and releases all
* currently waiting threads with indeterminate return status.
* This method is intended only for use in recovery actions
* in which it is somehow known
* that no thread could possibly be relying on the
* the synchronization properties of this barrier.
**/
public void restart() {
// This is not very good, but probably the best that can be done
for (;;) {
synchronized(this) {
if (entries_ != 0) {
notifyAll();
}
else {
broken_ = false;
return;
}
}
Thread.yield();
}
}
/**
* Enter a rendezvous; returning after all other parties arrive.
* @param x the item to present at rendezvous point.
* By default, this item is exchanged with another.
* @return an item x given by some thread, and/or processed
* by the rendezvousFunction.
* @exception BrokenBarrierException
* if any other thread
* in any previous or current barrier
* since either creation or the last <code>restart</code>
* operation left the barrier
* prematurely due to interruption or time-out. (If so,
* the <code>broken</code> status is also set.)
* Also returns as
* broken if the RendezvousFunction encountered a run-time exception.
* Threads that are noticed to have been
* interrupted <em>after</em> being released are not considered
* to have broken the barrier.
* In all cases, the interruption
* status of the current thread is preserved, so can be tested
* by checking <code>Thread.interrupted</code>.
* @exception InterruptedException if this thread was interrupted
* during the exchange. If so, <code>broken</code> status is also set.
**/
public Object rendezvous(Object x) throws InterruptedException, BrokenBarrierException {
return doRendezvous(x, false, 0);
}
/**
* Wait msecs to complete a rendezvous.
* @param x the item to present at rendezvous point.
* By default, this item is exchanged with another.
* @param msecs The maximum time to wait.
* @return an item x given by some thread, and/or processed
* by the rendezvousFunction.
* @exception BrokenBarrierException
* if any other thread
* in any previous or current barrier
* since either creation or the last <code>restart</code>
* operation left the barrier
* prematurely due to interruption or time-out. (If so,
* the <code>broken</code> status is also set.)
* Also returns as
* broken if the RendezvousFunction encountered a run-time exception.
* Threads that are noticed to have been
* interrupted <em>after</em> being released are not considered
* to have broken the barrier.
* In all cases, the interruption
* status of the current thread is preserved, so can be tested
* by checking <code>Thread.interrupted</code>.
* @exception InterruptedException if this thread was interrupted
* during the exchange. If so, <code>broken</code> status is also set.
* @exception TimeoutException if this thread timed out waiting for
* the exchange. If the timeout occured while already in the
* exchange, <code>broken</code> status is also set.
**/
public Object attemptRendezvous(Object x, long msecs)
throws InterruptedException, TimeoutException, BrokenBarrierException {
return doRendezvous(x, true, msecs);
}
protected Object doRendezvous(Object x, boolean timed, long msecs)
throws InterruptedException, TimeoutException, BrokenBarrierException {
// rely on semaphore to throw interrupt on entry
long startTime;
if (timed) {
startTime = System.currentTimeMillis();
if (!entryGate_.attempt(msecs)) {
throw new TimeoutException(msecs);
}
}
else {
startTime = 0;
entryGate_.acquire();
}
synchronized(this) {
Object y = null;
int index = entries_++;
slots_[index] = x;
try {
// last one in runs function and releases
if (entries_ == parties_) {
departures_ = entries_;
notifyAll();
try {
if (!broken_ && rendezvousFunction_ != null)
rendezvousFunction_.rendezvousFunction(slots_);
}
catch (RuntimeException ex) {
broken_ = true;
}
}
else {
while (!broken_ && departures_ < 1) {
long timeLeft = 0;
if (timed) {
timeLeft = msecs - (System.currentTimeMillis() - startTime);
if (timeLeft <= 0) {
broken_ = true;
departures_ = entries_;
notifyAll();
throw new TimeoutException(msecs);
}
}
try {
wait(timeLeft);
}
catch (InterruptedException ex) {
if (broken_ || departures_ > 0) { // interrupted after release
Thread.currentThread().interrupt();
break;
}
else {
broken_ = true;
departures_ = entries_;
notifyAll();
throw ex;
}
}
}
}
}
finally {
y = slots_[index];
// Last one out cleans up and allows next set of threads in
if (--departures_ <= 0) {
for (int i = 0; i < slots_.length; ++i) slots_[i] = null;
entryGate_.release(entries_);
entries_ = 0;
}
}
// continue if no IE/TO throw
if (broken_)
throw new BrokenBarrierException(index);
else
return y;
}
}
}
|