1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
|
(* TEST *)
open Printf
open Effect
open Effect.Deep
(** {1 Coroutines} *)
type 'a channel = {
senders: ('a * (unit, unit) continuation) Queue.t;
receivers: ('a, unit) continuation Queue.t
}
let new_channel () = { senders = Queue.create(); receivers = Queue.create() }
type _ eff += Spawn : (unit -> unit) -> unit eff
| Yield : unit eff
| Send : 'a channel * 'a -> unit eff
| Recv : 'a channel -> 'a eff
exception Terminate
let spawn f = perform (Spawn f)
let yield () = perform Yield
let terminate () = raise Terminate
let send ch v = perform (Send(ch, v))
let recv ch = perform (Recv ch)
(** The queue of runnable tasks *)
let runnable : (unit -> unit) Queue.t = Queue.create()
let suspend f = Queue.add f runnable
let restart () =
match Queue.take_opt runnable with
| None -> ()
| Some f -> f ()
(** The scheduler *)
let rec corun (f: unit -> unit) =
match f () with
| () | exception Terminate -> restart ()
| effect Spawn f, k -> suspend (continue k); corun f
| effect Yield, k -> suspend (continue k); restart ()
| effect Send(ch, v), k ->
begin match Queue.take_opt ch.receivers with
| Some rc -> suspend (continue k); continue rc v
| None -> Queue.add (v, k) ch.senders; restart()
end
| effect Recv ch, k ->
begin match Queue.take_opt ch.senders with
| Some(v, sn) -> suspend (continue sn); continue k v
| None -> Queue.add k ch.receivers; restart()
end
(** Example of use. *)
let task name n =
for i = 1 to n do
if i >= 7 then terminate();
printf "%s%d " name i;
yield()
done
let _ =
corun (fun () ->
spawn (fun () -> task "a" 8);
spawn (fun () -> task "b" 3);
spawn (fun () -> task "c" 4));
print_newline()
let _ =
let ch = new_channel() in
corun (fun () ->
spawn (fun () -> send ch "a");
spawn (fun () -> send ch "b");
printf "%s " (recv ch);
printf "%s\n" (recv ch))
(** Eratosthenes' sieve using a pipeline of filters. *)
let rec eratosthenes input =
let p = recv input in
printf "%d " p;
let output = new_channel() in
spawn (fun () -> eratosthenes output);
while true do
let n = recv input in
if n mod p <> 0 then send output n
done
let _ =
corun (fun () ->
let ints = new_channel() in
spawn (fun () -> eratosthenes ints);
for i = 2 to 1000 do send ints i done);
print_newline()
|