1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
(* $Id$ *)
open Uq_engines.Operators
module Cond = struct
type t = Netsys_oothr.condition
let compare = compare
end
module CondSet = Set.Make(Cond)
type monitor =
{ esys : Unixqueue.event_system;
mutable esys_running : bool;
mutable waiters : CondSet.t;
(* All threads waiting for an engine to complete have their own
condition variable. The variable is signalled when the engine
is finished (unless it is this thread that runs the esys).
We collect all these variables for another purpose, though.
When the current esys runner is done, another thread must be
woken up. We just take one condition from this set.
The alternative would be to use a single condition variable
for all conditions. This is simpler, but the downside is that
we need then to broadcast it from time to time.
*)
mutex : Netsys_oothr.mutex;
}
exception Esys_exit
let create_monitor esys =
{ esys = esys;
esys_running = false;
waiters = CondSet.empty;
mutex = !Netsys_oothr.provider # create_mutex()
}
let monitor_run mon f arg =
let cond = !Netsys_oothr.provider # create_condition() in
let result = ref None in
let esys_owner = ref false in
let g = Unixqueue.new_group mon.esys in
Unixqueue.once mon.esys g 0.0
(fun () ->
let inner_e =
try f mon.esys arg
with error -> eps_e (`Error error) mon.esys in
ignore(
inner_e
>> (fun st ->
mon.mutex # lock();
result := Some st;
if !esys_owner then
Unixqueue.once mon.esys g 0.0 (fun () -> raise Esys_exit)
else
cond # signal();
mon.mutex # unlock();
`Done ()
)
)
);
mon.mutex # lock();
while !result = None do
while !result = None && mon.esys_running do
mon.waiters <- CondSet.add cond mon.waiters;
cond # wait mon.mutex;
mon.waiters <- CondSet.remove cond mon.waiters;
done;
if !result = None then (
mon.esys_running <- true;
esys_owner := true;
mon.mutex # unlock();
( try
Unixqueue.run mon.esys
with
| Esys_exit -> ()
| error ->
Netlog.logf `Crit
"Uq_mt.monitor: caught exception: %s"
(Netexn.to_string error)
);
mon.mutex # lock();
esys_owner := false;
mon.esys_running <- false
)
done;
if not mon.esys_running && mon.waiters <> CondSet.empty then
(CondSet.choose mon.waiters) # signal();
mon.mutex # unlock();
match !result with
| Some(`Done x) -> x
| Some(`Error e) -> raise e
| Some(`Aborted) -> failwith "Uq_mt.monitor: aborted"
| None -> assert false
let monitor_do mon f arg =
monitor_run mon
(fun esys arg ->
let result = f arg in
eps_e (`Done result) esys
)
arg
let monitor_async mon f arg =
monitor_run mon
(fun esys arg ->
let e, signal = Uq_engines.signal_engine esys in
let emit get_result =
signal
( try `Done(get_result())
with error -> `Error error
) in
f arg emit;
e
)
arg
|