1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
|
(***********************************************************************)
(* *)
(* Objective Caml *)
(* *)
(* David Nowak and Xavier Leroy, projet Cristal, INRIA Rocquencourt *)
(* *)
(* Copyright 1996 Institut National de Recherche en Informatique et *)
(* en Automatique. All rights reserved. This file is distributed *)
(* under the terms of the GNU Library General Public License, with *)
(* the special exception on linking described in file ../../LICENSE. *)
(* *)
(***********************************************************************)
(* $Id: event.ml 9547 2010-01-22 12:48:24Z doligez $ *)
(* Events *)
type 'a basic_event =
{ poll: unit -> bool;
(* If communication can take place immediately, return true. *)
suspend: unit -> unit;
(* Offer the communication on the channel and get ready
to suspend current process. *)
result: unit -> 'a }
(* Return the result of the communication *)
type 'a behavior = int ref -> Condition.t -> int -> 'a basic_event
type 'a event =
Communication of 'a behavior
| Choose of 'a event list
| WrapAbort of 'a event * (unit -> unit)
| Guard of (unit -> 'a event)
(* Communication channels *)
type 'a channel =
{ mutable writes_pending: 'a communication Queue.t;
(* All offers to write on it *)
mutable reads_pending: 'a communication Queue.t }
(* All offers to read from it *)
(* Communication offered *)
and 'a communication =
{ performed: int ref; (* -1 if not performed yet, set to the number *)
(* of the matching communication after rendez-vous. *)
condition: Condition.t; (* To restart the blocked thread. *)
mutable data: 'a option; (* The data sent or received. *)
event_number: int } (* Event number in select *)
(* Create a channel *)
let new_channel () =
{ writes_pending = Queue.create();
reads_pending = Queue.create() }
(* Basic synchronization function *)
let masterlock = Mutex.create()
let do_aborts abort_env genev performed =
if abort_env <> [] then begin
if performed >= 0 then begin
let ids_done = snd genev.(performed) in
List.iter
(fun (id,f) -> if not (List.mem id ids_done) then f ())
abort_env
end else begin
List.iter (fun (_,f) -> f ()) abort_env
end
end
let basic_sync abort_env genev =
let performed = ref (-1) in
let condition = Condition.create() in
let bev = Array.create (Array.length genev)
(fst (genev.(0)) performed condition 0) in
for i = 1 to Array.length genev - 1 do
bev.(i) <- (fst genev.(i)) performed condition i
done;
(* See if any of the events is already activable *)
let rec poll_events i =
if i >= Array.length bev
then false
else bev.(i).poll() || poll_events (i+1) in
Mutex.lock masterlock;
if not (poll_events 0) then begin
(* Suspend on all events *)
for i = 0 to Array.length bev - 1 do bev.(i).suspend() done;
(* Wait until the condition is signalled *)
Condition.wait condition masterlock
end;
Mutex.unlock masterlock;
(* Extract the result *)
if abort_env = [] then
(* Preserve tail recursion *)
bev.(!performed).result()
else begin
let num = !performed in
let result = bev.(num).result() in
(* Handle the aborts and return the result *)
do_aborts abort_env genev num;
result
end
(* Apply a random permutation on an array *)
let scramble_array a =
let len = Array.length a in
if len = 0 then invalid_arg "Event.choose";
for i = len - 1 downto 1 do
let j = Random.int (i + 1) in
let temp = a.(i) in a.(i) <- a.(j); a.(j) <- temp
done;
a
(* Main synchronization function *)
let gensym = let count = ref 0 in fun () -> incr count; !count
let rec flatten_event
(abort_list : int list)
(accu : ('a behavior * int list) list)
(accu_abort : (int * (unit -> unit)) list)
ev =
match ev with
Communication bev -> ((bev,abort_list) :: accu) , accu_abort
| WrapAbort (ev,fn) ->
let id = gensym () in
flatten_event (id :: abort_list) accu ((id,fn)::accu_abort) ev
| Choose evl ->
let rec flatten_list accu' accu_abort'= function
ev :: l ->
let (accu'',accu_abort'') =
flatten_event abort_list accu' accu_abort' ev in
flatten_list accu'' accu_abort'' l
| [] -> (accu',accu_abort') in
flatten_list accu accu_abort evl
| Guard fn -> flatten_event abort_list accu accu_abort (fn ())
let sync ev =
let (evl,abort_env) = flatten_event [] [] [] ev in
basic_sync abort_env (scramble_array(Array.of_list evl))
(* Event polling -- like sync, but non-blocking *)
let basic_poll abort_env genev =
let performed = ref (-1) in
let condition = Condition.create() in
let bev = Array.create(Array.length genev)
(fst genev.(0) performed condition 0) in
for i = 1 to Array.length genev - 1 do
bev.(i) <- fst genev.(i) performed condition i
done;
(* See if any of the events is already activable *)
let rec poll_events i =
if i >= Array.length bev
then false
else bev.(i).poll() || poll_events (i+1) in
Mutex.lock masterlock;
let ready = poll_events 0 in
if ready then begin
(* Extract the result *)
Mutex.unlock masterlock;
let result = Some(bev.(!performed).result()) in
do_aborts abort_env genev !performed; result
end else begin
(* Cancel the communication offers *)
performed := 0;
Mutex.unlock masterlock;
do_aborts abort_env genev (-1);
None
end
let poll ev =
let (evl,abort_env) = flatten_event [] [] [] ev in
basic_poll abort_env (scramble_array(Array.of_list evl))
(* Remove all communication opportunities already synchronized *)
let cleanup_queue q =
let q' = Queue.create() in
Queue.iter (fun c -> if !(c.performed) = -1 then Queue.add c q') q;
q'
(* Event construction *)
let always data =
Communication(fun performed condition evnum ->
{ poll = (fun () -> performed := evnum; true);
suspend = (fun () -> ());
result = (fun () -> data) })
let send channel data =
Communication(fun performed condition evnum ->
let wcomm =
{ performed = performed;
condition = condition;
data = Some data;
event_number = evnum } in
{ poll = (fun () ->
let rec poll () =
let rcomm = Queue.take channel.reads_pending in
if !(rcomm.performed) >= 0 then
poll ()
else begin
rcomm.data <- wcomm.data;
performed := evnum;
rcomm.performed := rcomm.event_number;
Condition.signal rcomm.condition
end in
try
poll();
true
with Queue.Empty ->
false);
suspend = (fun () ->
channel.writes_pending <- cleanup_queue channel.writes_pending;
Queue.add wcomm channel.writes_pending);
result = (fun () -> ()) })
let receive channel =
Communication(fun performed condition evnum ->
let rcomm =
{ performed = performed;
condition = condition;
data = None;
event_number = evnum } in
{ poll = (fun () ->
let rec poll () =
let wcomm = Queue.take channel.writes_pending in
if !(wcomm.performed) >= 0 then
poll ()
else begin
rcomm.data <- wcomm.data;
performed := evnum;
wcomm.performed := wcomm.event_number;
Condition.signal wcomm.condition
end in
try
poll();
true
with Queue.Empty ->
false);
suspend = (fun () ->
channel.reads_pending <- cleanup_queue channel.reads_pending;
Queue.add rcomm channel.reads_pending);
result = (fun () ->
match rcomm.data with
None -> invalid_arg "Event.receive"
| Some res -> res) })
let choose evl = Choose evl
let wrap_abort ev fn = WrapAbort(ev,fn)
let guard fn = Guard fn
let rec wrap ev fn =
match ev with
Communication genev ->
Communication(fun performed condition evnum ->
let bev = genev performed condition evnum in
{ poll = bev.poll;
suspend = bev.suspend;
result = (fun () -> fn(bev.result())) })
| Choose evl ->
Choose(List.map (fun ev -> wrap ev fn) evl)
| WrapAbort (ev, f') ->
WrapAbort (wrap ev fn, f')
| Guard gu ->
Guard(fun () -> wrap (gu()) fn)
(* Convenience functions *)
let select evl = sync(Choose evl)
|