File: FJTaskRunnerGroup.java

package info (click to toggle)
concurrent-dfsg 1.3.4-4
  • links: PTS, VCS
  • area: main
  • in suites: buster, jessie, jessie-kfreebsd, squeeze, stretch, wheezy
  • size: 976 kB
  • ctags: 2,018
  • sloc: java: 10,704; xml: 49; makefile: 12
file content (618 lines) | stat: -rw-r--r-- 20,499 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
/*
  File: FJTaskRunnerGroup.java

  Originally written by Doug Lea and released into the public domain.
  This may be used for any purposes whatsoever without acknowledgment.
  Thanks for the assistance and support of Sun Microsystems Labs,
  and everyone contributing, testing, and using this code.

  History:
  Date       Who                What
  7Jan1999   dl                 First public release
  12Jan1999  dl                 made getActiveCount public; misc minor cleanup.
  14Jan1999  dl                 Added executeTask
  20Jan1999  dl                 Allow use of priorities; reformat stats
  6Feb1999   dl                 Lazy thread starts
  27Apr1999  dl                 Renamed
*/

package EDU.oswego.cs.dl.util.concurrent;

/**
 * A stripped down analog of a ThreadGroup used for
 * establishing and managing FJTaskRunner threads.
 * ThreadRunnerGroups serve as the control boundary separating
 * the general world of normal threads from the specialized world
 * of FJTasks. 
 * <p>
 * By intent, this class does not subclass java.lang.ThreadGroup, and
 * does not support most methods found in ThreadGroups, since they
 * would make no sense for FJTaskRunner threads. In fact, the class
 * does not deal with ThreadGroups at all. If you want to restrict
 * a FJTaskRunnerGroup to a particular ThreadGroup, you can create
 * it from within that ThreadGroup.
 * <p>
 * The main contextual parameter for a FJTaskRunnerGroup is
 * the group size, established in the constructor. 
 * Groups must be of a fixed size.
 * There is no way to dynamically increase or decrease the number
 * of threads in an existing group.
 * <p>
 * In general, the group size should be equal to the number
 * of CPUs on the system. (Unfortunately, there is no portable
 * means of automatically detecting the number of CPUs on a JVM, so there is
 * no good way to automate defaults.)  In principle, when
 * FJTasks are used for computation-intensive tasks, having only 
 * as many threads as CPUs should minimize bookkeeping overhead
 * and contention, and so maximize throughput. However, because
 * FJTaskRunners lie atop Java threads, and in turn operating system
 * thread support and scheduling policies, 
 * it is very possible that using more threads
 * than CPUs will improve overall throughput even though it adds
 * to overhead. This will always be so if FJTasks are I/O bound.
 * So it may pay to experiment a bit when tuning on particular platforms.
 * You can also use <code>setRunPriorities</code> to either
 * increase or decrease the priorities of active threads, which
 * may interact with group size choice.
 * <p>
 * In any case, overestimating group sizes never
 * seriously degrades performance (at least within reasonable bounds). 
 * You can also use a value
 * less than the number of CPUs in order to reserve processing
 * for unrelated threads. 
 * <p>
 * There are two general styles for using a FJTaskRunnerGroup.
 * You can create one group per entire program execution, for example 
 * as a static singleton, and use it for all parallel tasks:
 * <pre>
 * class Tasks {
 *   static FJTaskRunnerGroup group;
 *   public void initialize(int groupsize) {
 *      group = new FJTaskRunnerGroup(groupSize);
 *   }
 *   // ...
 * }
 * </pre>
 * Alternatively, you can make new groups on the fly and use them only for
 * particular task sets. This is more flexible,,
 * and leads to more controllable and deterministic execution patterns,
 * but it encounters greater overhead on startup. Also, to reclaim
 * system resources, you should
 * call <code>FJTaskRunnerGroup.interruptAll</code> when you are done
 * using one-shot groups. Otherwise, because FJTaskRunners set 
 * <code>Thread.isDaemon</code>
 * status, they will not normally be reclaimed until program termination.
 * <p>
 * The main supported methods are <code>execute</code>,
 * which starts a task processed by FJTaskRunner threads,
 * and <code>invoke</code>, which starts one and waits for completion.
 * For example, you might extend the above <code>FJTasks</code>
 * class to support a task-based computation, say, the
 * <code>Fib</code> class from the <code>FJTask</code> documentation:
 * <pre>
 * class Tasks { // continued
 *   // ...
 *   static int fib(int n) {
 *     try {
 *       Fib f = new Fib(n);
 *       group.invoke(f);
 *       return f.getAnswer();
 *     }
 *     catch (InterruptedException ex) {
 *       throw new Error("Interrupted during computation");
 *     }
 *   }
 * }
 * </pre>
 * <p>
 * Method <code>stats()</code> can be used to monitor performance.
 * Both FJTaskRunnerGroup and FJTaskRunner may be compiled with
 * the compile-time constant COLLECT_STATS set to false. In this
 * case, various simple counts reported in stats() are not collected.
 * On platforms tested,
 * this leads to such a tiny performance improvement that there is 
 * very little motivation to bother.
 *
 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
 * <p>
 * @see FJTask
 * @see FJTaskRunner
 **/

public class FJTaskRunnerGroup implements Executor {

  /** The threads in this group **/
  protected final FJTaskRunner[] threads;

  /** Group-wide queue for tasks entered via execute() **/
  protected final LinkedQueue entryQueue = new LinkedQueue();

  /** Number of threads that are not waiting for work **/
  protected int activeCount = 0;

  /** Number of threads that have been started. Used to avoid
      unecessary contention during startup of task sets.
  **/
  protected int nstarted = 0;

  /**
   * Compile-time constant. If true, various counts of
   * runs, waits, etc., are maintained. These are NOT
   * updated with synchronization, so statistics reports
   * might not be accurate.
   **/
  
  static final boolean COLLECT_STATS = true;
  //  static final boolean COLLECT_STATS = false;

  // for stats

  /** The time at which this ThreadRunnerGroup was constructed **/
  long initTime = 0;

  /** Total number of executes or invokes **/
  int entries = 0;

  static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY+1;

  /** 
   * Create a FJTaskRunnerGroup with the indicated number
   * of FJTaskRunner threads. Normally, the best size to use is
   * the number of CPUs on the system. 
   * <p>
   * The threads in a FJTaskRunnerGroup are created with their
   * isDaemon status set, so do not normally need to be
   * shut down manually upon program termination.
   **/

  public FJTaskRunnerGroup(int groupSize) { 
    threads = new FJTaskRunner[groupSize];
    initializeThreads();
    initTime = System.currentTimeMillis();
  }

  /**
   * Arrange for execution of the given task
   * by placing it in a work queue. If the argument
   * is not of type FJTask, it is embedded in a FJTask via 
   * <code>FJTask.Wrap</code>.
   * @exception InterruptedException if current Thread is
   * currently interrupted 
   **/

  public void execute(Runnable r) throws InterruptedException {
    if (r instanceof FJTask) {
      entryQueue.put((FJTask)r);
    }
    else {
      entryQueue.put(new FJTask.Wrap(r));
    }
    signalNewTask();
  }


  /**
   * Specialized form of execute called only from within FJTasks
   **/
  public void executeTask(FJTask t) {
    try {
      entryQueue.put(t);
      signalNewTask();
    }
    catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
    }
  }


  /**
   * Start a task and wait it out. Returns when the task completes.
   * @exception InterruptedException if current Thread is
   * interrupted before completion of the task.
   **/

  public void invoke(Runnable r) throws InterruptedException {
    InvokableFJTask w = new InvokableFJTask(r);
    entryQueue.put(w);
    signalNewTask();
    w.awaitTermination();
  }


  /**
   * Try to shut down all FJTaskRunner threads in this group
   * by interrupting them all. This method is designed
   * to be used during cleanup when it is somehow known
   * that all threads are idle.
   * FJTaskRunners only
   * check for interruption when they are not otherwise
   * processing a task (and its generated subtasks,
   * if any), so if any threads are active, shutdown may
   * take a while, and may lead to unpredictable
   * task processing.
   **/

  public void interruptAll() {
    // paranoically interrupt current thread last if in group.
    Thread current = Thread.currentThread();
    boolean stopCurrent = false;

    for (int i = 0; i < threads.length; ++i) {
      Thread t = threads[i];
      if (t == current) 
        stopCurrent = true;
      else
        t.interrupt();
    }
    if (stopCurrent)
      current.interrupt();
  }


  /**
   * Set the priority to use while a FJTaskRunner is
   * polling for new tasks to perform. Default
   * is currently Thread.MIN_PRIORITY+1. The value
   * set may not go into effect immediately, but
   * will be used at least the next time a thread scans for work.
   **/
  public synchronized void setScanPriorities(int pri) {
    for (int i = 0; i < threads.length; ++i) {
      FJTaskRunner t = threads[i];
      t.setScanPriority(pri);
      if (!t.active) t.setPriority(pri);
    }
  }


  /**
   * Set the priority to use while a FJTaskRunner is
   * actively running tasks. Default
   * is the priority that was in effect by the thread that
   * constructed this FJTaskRunnerGroup. Setting this value
   * while threads are running may momentarily result in
   * them running at this priority even when idly waiting for work.
   **/
  public synchronized void setRunPriorities(int pri) {
    for (int i = 0; i < threads.length; ++i) {
      FJTaskRunner t = threads[i];
      t.setRunPriority(pri);
      if (t.active) t.setPriority(pri);
    }
  }

    

  /** Return the number of FJTaskRunner threads in this group **/

  public int size() { return threads.length; }


  /** 
   * Return the number of threads that are not idly waiting for work.
   * Beware that even active threads might not be doing any useful
   * work, but just spinning waiting for other dependent tasks.
   * Also, since this is just a snapshot value, some tasks
   * may be in the process of becoming idle.
   **/
  public synchronized int getActiveCount() { return activeCount; }

  /**
   * Prints various snapshot statistics to System.out.
   * <ul>
   *   <li> For each FJTaskRunner thread (labeled as T<em>n</em>, for
   *         <em>n</em> from zero to group size - 1):
   *     <ul>
   *       <li> A star "*" is printed if the thread is currently active;
   *            that is, not sleeping while waiting for work. Because
   *            threads gradually enter sleep modes, an active thread
   *            may in fact be about to sleep (or wake up).
   *       <li> <em>Q Cap</em> The current capacity of its task queue.
   *       <li> <em>Run</em> The total number of tasks that have been run.
   *       <li> <em>New</em> The number of these tasks that were
   *               taken from either the entry queue or from other 
   *               thread queues; that is, the number of tasks run
   *               that were <em>not</em> forked by the thread itself.
   *       <li> <em>Scan</em> The number of times other task
   *               queues or the entry queue were polled for tasks.
   *     </ul>
   *   <li> <em>Execute</em> The total number of tasks entered
   *        (but not necessarily yet run) via execute or invoke.
   *   <li> <em>Time</em> Time in seconds since construction of this
   *         FJTaskRunnerGroup.
   *   <li> <em>Rate</em> The total number of tasks processed
   *          per second across all threads. This
   *          may be useful as a simple throughput indicator
   *          if all processed tasks take approximately the
   *          same time to run.
   * </ul>
   * <p>
   * Cautions: Some statistics are updated and gathered 
   * without synchronization,
   * so may not be accurate. However, reported counts may be considered
   * as lower bounds of actual values. 
   * Some values may be zero if classes are compiled
   * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup
   * classes can be independently compiled with different values of
   * COLLECT_STATS.) Also, the counts are maintained as ints so could
   * overflow in exceptionally long-lived applications.
   * <p>
   * These statistics can be useful when tuning algorithms or diagnosing
   * problems. For example:
   * <ul>
   *  <li> High numbers of scans may mean that there is insufficient
   *      parallelism to keep threads busy. However, high scan rates
   *      are expected if the number
   *      of Executes is also high or there is a lot of global
   *      synchronization in the application, and the system is not otherwise
   *      busy. Threads may scan
   *      for work hundreds of times upon startup, shutdown, and
   *      global synch points of task sets.
   *  <li> Large imbalances in tasks run across different threads might
   *      just reflect contention with unrelated threads on a system
   *      (possibly including JVM threads such as GC), but may also
   *      indicate some systematic bias in how you generate tasks.
   *  <li> Large task queue capacities may mean that too many tasks are being
   *     generated before they can be run. 
   *     Capacities are reported rather than current numbers of tasks
   *     in queues because they are better indicators of the existence
   *     of these kinds of possibly-transient problems.
   *     Queue capacities are
   *     resized on demand from their initial value of 4096 elements,
   *     which is much more than sufficient for the kinds of 
   *     applications that this framework is intended to best support.
   * </ul>
   **/

  public void stats() {
    long time = System.currentTimeMillis() - initTime;
    double secs = ((double)time) / 1000.0;
    long totalRuns = 0;
    long totalScans = 0;
    long totalSteals = 0;

    System.out.print("Thread" +
                     "\tQ Cap" +
                       "\tScans" +
                       "\tNew" +
                       "\tRuns" +
                       "\n");

    for (int i = 0; i < threads.length; ++i) {
      FJTaskRunner t = threads[i];
      int truns = t.runs;
      totalRuns += truns;

      int tscans = t.scans;
      totalScans += tscans;

      int tsteals = t.steals;
      totalSteals += tsteals;

      String star = (getActive(t))? "*" : " ";


      System.out.print("T" + i + star +
                       "\t" + t.deqSize() +
                       "\t" + tscans +
                       "\t" + tsteals +
                       "\t" + truns +
                       "\n");
    }

    System.out.print("Total" +
                     "\t    " +
                     "\t" + totalScans +
                     "\t" + totalSteals +
                     "\t" + totalRuns +
                     "\n");

    System.out.print("Execute: " + entries); 
    
    System.out.print("\tTime: " + secs);

    long rps = 0;
    if (secs != 0) rps = Math.round((double)(totalRuns) / secs);

    System.out.println("\tRate: " + rps);
  }


  /* ------------ Methods called only by FJTaskRunners ------------- */


  /**
   * Return the array of threads in this group. 
   * Called only by FJTaskRunner.scan().
   **/

  protected FJTaskRunner[] getArray() { return threads; }


  /**
   * Return a task from entry queue, or null if empty.
   * Called only by FJTaskRunner.scan().
   **/

  protected FJTask pollEntryQueue() {
    try {
      FJTask t = (FJTask)(entryQueue.poll(0));
      return t;
    }
    catch(InterruptedException ex) { // ignore interrupts
      Thread.currentThread().interrupt();
      return null;
    }
  }


  /**
   * Return active status of t.
   * Per-thread active status can only be accessed and
   * modified via synchronized method here in the group class.
   **/

  protected synchronized boolean getActive(FJTaskRunner t) {
    return t.active;
  }


  /**
   * Set active status of thread t to true, and notify others
   * that might be waiting for work. 
   **/

  protected synchronized void setActive(FJTaskRunner t) {
    if (!t.active) { 
      t.active = true;
      ++activeCount;
      if (nstarted < threads.length) 
        threads[nstarted++].start();
      else
        notifyAll();
    }
  }

  /**
   * Set active status of thread t to false.
   **/

  protected synchronized void setInactive(FJTaskRunner t) {
    if (t.active) { 
      t.active = false;
      --activeCount;
    }
  }

  /**
   * The number of times to scan other threads for tasks 
   * before transitioning to a mode where scans are
   * interleaved with sleeps (actually timed waits).
   * Upon transition, sleeps are for duration of
   * scans / SCANS_PER_SLEEP milliseconds.
   * <p>
   * This is not treated as a user-tunable parameter because
   * good values do not appear to vary much across JVMs or
   * applications. Its main role is to help avoid some
   * useless spinning and contention during task startup.
   **/
  static final long SCANS_PER_SLEEP = 15;

  /**
   * The maximum time (in msecs) to sleep when a thread is idle,
   * yet others are not, so may eventually generate work that
   * the current thread can steal. This value reflects the maximum time
   * that a thread may sleep when it possibly should not, because there
   * are other active threads that might generate work. In practice,
   * designs in which some threads become stalled because others
   * are running yet not generating tasks are not likely to work
   * well in this framework anyway, so the exact value does not matter
   * too much. However, keeping it in the sub-second range does
   * help smooth out startup and shutdown effects.
   **/

  static final long MAX_SLEEP_TIME = 100;

  /**
   * Set active status of thread t to false, and
   * then wait until: (a) there is a task in the entry 
   * queue, or (b) other threads are active, or (c) the current
   * thread is interrupted. Upon return, it
   * is not certain that there will be work available.
   * The thread must itself check. 
   * <p>
   * The main underlying reason
   * for these mechanics is that threads do not
   * signal each other when they add elements to their queues.
   * (This would add to task overhead, reduce locality.
   * and increase contention.)
   * So we must rely on a tamed form of polling. However, tasks
   * inserted into the entry queue do result in signals, so
   * tasks can wait on these if all of them are otherwise idle.
   **/

  protected synchronized void checkActive(FJTaskRunner t, long scans) {

    setInactive(t);

    try {
      // if nothing available, do a hard wait
      if (activeCount == 0 && entryQueue.peek() == null) { 
        wait();
      }
      else { 
        // If there is possibly some work,
        // sleep for a while before rechecking 

        long msecs = scans / SCANS_PER_SLEEP;
        if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME;
        int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep
        wait(msecs, nsecs);
      }
    }
    catch (InterruptedException ex) {
      notify(); // avoid lost notifies on interrupts
      Thread.currentThread().interrupt();
    }
  }

  /* ------------ Utility methods  ------------- */

  /**
   * Start or wake up any threads waiting for work
   **/

  protected synchronized void signalNewTask() {
    if (COLLECT_STATS) ++entries;
    if (nstarted < threads.length) 
       threads[nstarted++].start();
    else
      notify();
  }

  /**
   * Create all FJTaskRunner threads in this group.
   **/

  protected void initializeThreads() {
    for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this);
  }




  /**
   * Wrap wait/notify mechanics around a task so that
   * invoke() can wait it out 
   **/
  protected static final class InvokableFJTask extends FJTask {
    protected final Runnable wrapped;
    protected boolean terminated = false;

    protected InvokableFJTask(Runnable r) { wrapped = r; }

    public void run() {
      try {
        if (wrapped instanceof FJTask)
          FJTask.invoke((FJTask)(wrapped));
        else
          wrapped.run();
      }
      finally {
        setTerminated();
      }
    }

    protected synchronized void setTerminated() {
      terminated = true;
      notifyAll(); 
    }

    protected synchronized void awaitTermination() throws InterruptedException {
      while (!terminated) wait();
    }
  }


}