1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
|
package org.perl.inline.java ;
import java.util.* ;
import java.io.* ;
/*
Queue for callbacks to Perl...
*/
class InlineJavaCallbackQueue {
// private InlineJavaServer ijs = InlineJavaServer.GetInstance() ;
private ArrayList<InlineJavaCallback> queue = new ArrayList<>() ;
private boolean wait_interrupted = false ;
private boolean stream_opened = false ;
InlineJavaCallbackQueue(){
}
synchronized void EnqueueCallback(InlineJavaCallback ijc){
queue.add(ijc) ;
notify() ;
}
synchronized private InlineJavaCallback DequeueCallback(){
if (GetSize() > 0){
return (InlineJavaCallback)queue.remove(0) ;
}
return null ;
}
synchronized int WaitForCallback(double timeout){
long secs = (long)Math.floor(timeout) ;
double rest = timeout - ((double)secs) ;
long millis = (long)Math.floor(rest * 1000.0) ;
rest = (rest * 1000.0) - ((double)millis) ;
int nanos = (int)Math.floor(rest * 1000000.0) ;
return WaitForCallback((secs * 1000) + millis, nanos) ;
}
/*
Blocks up to the specified time for the next callback to arrive.
Returns -1 if the wait was interrupted voluntarily, 0 on timeout or
> 0 if a callback has arrived before the timeout expired.
*/
synchronized int WaitForCallback(long millis, int nanos){
wait_interrupted = false ;
Thread t = Thread.currentThread() ;
InlineJavaUtils.debug(3, "waiting for callback request (" + millis + " millis, " +
nanos + " nanos) in " + t.getName() + "...") ;
if (! stream_opened){
return -1 ;
}
while ((stream_opened)&&(! wait_interrupted)&&(IsEmpty())){
try {
wait(millis, nanos) ;
// If we reach this code, it means the either we timed out
// or that we were notify()ed.
// In the former case, we must break out and return 0.
// In the latter case, either the queue will not be empty or
// wait_interrupted will be set. We must therefore also break out.
break ;
}
catch (InterruptedException ie){
// Do nothing, return and wait() some more...
}
}
InlineJavaUtils.debug(3, "waiting for callback request finished " + t.getName() + "...") ;
if (wait_interrupted){
return -1 ;
}
else {
return GetSize() ;
}
}
/*
Waits indefinetely for the next callback to arrive and executes it.
Return true on success of false if the wait was interrupted voluntarily.
*/
synchronized boolean ProcessNextCallback() throws InlineJavaException, InlineJavaPerlException {
int rc = WaitForCallback(0, 0) ;
if (rc == -1){
// Wait was interrupted
return false ;
}
// DequeueCallback can't return null because we explicetely
// waited until a callback was there.
Thread t = Thread.currentThread() ;
InlineJavaUtils.debug(3, "processing callback request in " + t.getName() + "...") ;
InlineJavaCallback ijc = DequeueCallback() ;
ijc.Process() ;
ijc.NotifyOfResponse(t) ;
return true ;
}
private boolean IsEmpty(){
return (GetSize() == 0) ;
}
void OpenCallbackStream(){
stream_opened = true ;
}
synchronized void CloseCallbackStream(){
stream_opened = false ;
InterruptWaitForCallback() ;
}
boolean IsStreamOpen(){
return stream_opened ;
}
int GetSize(){
return queue.size() ;
}
synchronized private void InterruptWaitForCallback(){
Thread t = Thread.currentThread() ;
InlineJavaUtils.debug(3, "interrupting wait for callback request in " + t.getName() + "...") ;
wait_interrupted = true ;
notify() ;
}
}
|