1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
package fileIO;
import java.util.Arrays;
import shared.Shared;
import shared.Tools;
/**
* @author Brian Bushnell
* @date Jan 2, 2013
*
*/
public class LoadThread<X> extends Thread{
public static <Y> LoadThread<Y> load(String fname, Class<Y> c){
LoadThread<Y> lt=new LoadThread<Y>(fname, c);
lt.start();
return lt;
}
private LoadThread(String fname_, Class<X> c_){
fname=fname_;
c=c_;
addThread(1);
}
@Override
public void run(){
addRunningThread(1);
output=ReadWrite.read(c, fname, false);
addRunningThread(-1);
synchronized(this){this.notify();}
}
private static final int addThread(int x){
final int lim=(Shared.LOW_MEMORY ? 1 : LIMIT);
synchronized(activeThreads){
assert(x!=0);
if(x>0){
activeThreads[0]+=x;
activeThreads[1]+=x;
}else{
addRunningThread(x);
}
assert(activeThreads[0]==(activeThreads[1]+activeThreads[2]) && activeThreads[0]>=0 && activeThreads[1]>=0 &&
activeThreads[2]>=0 && activeThreads[2]<=lim) : Arrays.toString(activeThreads);
return activeThreads[0];
}
}
private static final int addRunningThread(int x){
final int lim=(Shared.LOW_MEMORY ? 1 : LIMIT);
synchronized(activeThreads){
assert(x!=0);
if(x>0){
assert(activeThreads[1]>=x);
while(activeThreads[2]>=lim){
try {
activeThreads.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
activeThreads[1]-=x; //Remove from waiting
}else{
activeThreads[0]+=x; //Remove from active
}
activeThreads[2]+=x; //Change number running
assert(activeThreads[0]==(activeThreads[1]+activeThreads[2]) && activeThreads[0]>=0 && activeThreads[1]>=0 &&
activeThreads[2]>=0 && activeThreads[2]<=lim) : Arrays.toString(activeThreads);
if(activeThreads[2]==0 || (activeThreads[2]<lim && activeThreads[1]>0)){activeThreads.notify();}
// System.err.println(activeThreads[2]);
// try {
// activeThreads.wait(5000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
return activeThreads[2];
}
}
public static final int countActiveThreads(){
final int lim=(Shared.LOW_MEMORY ? 1 : LIMIT);
synchronized(activeThreads){
assert(activeThreads[0]==(activeThreads[1]+activeThreads[2]) && activeThreads[0]>=0 && activeThreads[1]>=0 &&
activeThreads[2]>=0 && activeThreads[2]<=lim) : Arrays.toString(activeThreads);
return activeThreads[0];
}
}
public static final void waitForReadingToFinish(){
final int lim=(Shared.LOW_MEMORY ? 1 : LIMIT);
synchronized(activeThreads){
while(activeThreads[0]>0){
assert(activeThreads[0]==(activeThreads[1]+activeThreads[2]) && activeThreads[0]>=0 && activeThreads[1]>=0 &&
activeThreads[2]>=0 && activeThreads[2]<=lim) : Arrays.toString(activeThreads);
try {
activeThreads.wait(8000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(activeThreads[2]==0 || (activeThreads[2]<lim && activeThreads[1]>0)){activeThreads.notify();}
}
}
}
public final void waitForThisToFinish(){
if(output==null){
while(this.getState()!=State.TERMINATED){
try {
this.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
/** {active, waiting, running} <br>
* Active means running or waiting.
*/
public static int[] activeThreads={0, 0, 0};
private final String fname;
private final Class<X> c;
public X output=null;
private static final int[] RUNNING=new int[1];
public static int LIMIT=Tools.min(12, Tools.max(Shared.threads(), 1));
}
|