
|
(* $Id$ *)
(* Chameneos game implemented with message passing
see
http://cedric.cnam.fr/PUBLIS/RC474.pdf
http://shootout.alioth.debian.org/u32/benchmark.php?test=chameneosredux&lang=all
This solution does not claim to be fast in any way. It is just a
coding example. In particular, message passing is not optimal here
because the messages are very short (and thus the whole message
copying machinery consumes a lot of setup time relative to total
time), and the worker processes do almost nothing (Color.complement).
So this program tests mostly the bare minimum time needed for
synchronization, but for any real-world example it will be much better.
*)
open Printf
let spell_int i =
let spell_char = function
| '0' -> "zero"
| '1' -> "one"
| '2' -> "two"
| '3' -> "three"
| '4' -> "four"
| '5' -> "five"
| '6' -> "six"
| '7' -> "seven"
| '8' -> "eight"
| '9' -> "nine"
| x -> failwith "unexpected char"
in
let s = string_of_int i in
String.iter (fun c -> printf " %s" (spell_char c)) s;
module Color = struct
type t =
| Blue
| Red
| Yellow
let complement t t' =
match t, t' with
| Blue, Blue -> Blue
| Blue, Red -> Yellow
| Blue, Yellow -> Red
| Red, Blue -> Yellow
| Red, Red -> Red
| Red, Yellow -> Blue
| Yellow, Blue -> Red
| Yellow, Red -> Blue
| Yellow, Yellow -> Yellow
let to_string = function
| Blue -> "blue"
| Red -> "red"
| Yellow -> "yellow"
let all = [ Blue; Red; Yellow ]
end
module Chameneos_type = struct
type t = {
id : int;
mutable color : Color.t;
mutable meetings : int;
mutable meetings_with_self : int;
}
end
module Meeting_place = struct
(* In this solution the meeting place is an independent process to
which messages are sent (i.e. no "monitor" as suggested by the
original article)
*)
module Id_var =
Netplex_sharedvar.Make_var_type(struct type t = Netmcore.res_id end)
let get_box_id_var (`Process pid) =
sprintf "Meeting_place.box.%d" pid
let set_box_id pid box_id =
let box_id_var = get_box_id_var pid in
ignore(Netplex_sharedvar.create_var ~enc:true box_id_var);
Id_var.set box_id_var box_id
let get_box_id pid =
let box_id_var = get_box_id_var pid in
ignore(Netplex_sharedvar.create_var ~enc:true box_id_var);
ignore(Netplex_sharedvar.wait_for_enc_value box_id_var);
Id_var.get box_id_var
type config =
{ num_slots : int; (* #slots of the box *)
meetings : int;
}
type meet_request =
{ me : Chameneos_type.t;
response_box : Netmcore.res_id;
}
type request =
| Meet_request of meet_request
| Shutdown
type response =
{ mate_opt : Chameneos_type.t option }
(* None = all meetings done *)
type state =
| Empty
| First of int * meet_request
(* slot, request *)
let meeting_place config =
let (box : request ref Netcamlbox.camlbox), box_id =
Netmcore_camlbox.create_camlbox "chameneos" config.num_slots 512 in
(* 512: just an upper limit for the message size *)
(* Put the ID of the box into a global variable, so all workers
can get it from there
*)
let pid = Netmcore.self_process_id() in
set_box_id pid box_id;
let response_boxes = Hashtbl.create 29 in
let get_response_box id =
try
Hashtbl.find response_boxes id
with Not_found ->
let b = Netmcore_camlbox.lookup_camlbox_sender (`Resource id) in
Hashtbl.add response_boxes id b;
(b : response Netcamlbox.camlbox_sender)
in
let meetings_left = ref config.meetings in
let state = ref Empty in
let live = ref true in
while !live do
let req_slots = Netcamlbox.camlbox_wait box in
List.iter
(fun req_slot ->
let req = !(Netcamlbox.camlbox_get box req_slot) in
(* no copy! So be careful... *)
match req with
| Meet_request mreq ->
if !meetings_left > 0 then (
match !state with
| Empty ->
state := First(req_slot,mreq)
| First(first_slot,first_req) ->
let `Resource r1 = first_req.response_box in
let `Resource r2 = mreq.response_box in
let fst_box =
get_response_box r1 in
Netcamlbox.camlbox_send
fst_box { mate_opt = Some mreq.me };
let snd_box =
get_response_box r2 in
Netcamlbox.camlbox_send
snd_box { mate_opt = Some first_req.me };
decr meetings_left;
state := Empty;
Netcamlbox.camlbox_delete box first_slot;
Netcamlbox.camlbox_delete box req_slot
)
else (
let `Resource r = mreq.response_box in
let r_box = get_response_box r in
Netcamlbox.camlbox_delete box req_slot;
Netcamlbox.camlbox_send r_box { mate_opt = None };
)
| Shutdown ->
Netlog.logf `Debug "Got shutdown request";
Netcamlbox.camlbox_delete box req_slot;
live := false
)
req_slots
done;
let box_id_var = get_box_id_var pid in
ignore(Netplex_sharedvar.delete_var box_id_var)
let fork_meeting_place, join_meeting_place =
Netmcore_process.def_process meeting_place
let start config =
Netmcore_process.start fork_meeting_place config
let join pid =
ignore(Netmcore_process.join join_meeting_place pid)
let shutdown pid =
let req_box_id = get_box_id pid in
let b = Netmcore_camlbox.lookup_camlbox_sender req_box_id in
Netcamlbox.camlbox_send b (ref Shutdown)
type connector =
{ mp_req_box : request ref Netcamlbox.camlbox_sender;
mp_resp_box : response Netcamlbox.camlbox;
mp_resp_box_id : Netmcore.res_id;
}
let connect pid =
let req_box_id = get_box_id pid in
let (r_box : response Netcamlbox.camlbox), r_box_id =
Netmcore_camlbox.create_camlbox "chameneos" 2 512 in
{ mp_req_box = Netmcore_camlbox.lookup_camlbox_sender req_box_id;
mp_resp_box = r_box;
mp_resp_box_id = r_box_id;
}
let meet pref_slot mp_conn ch =
let req = { me = ch; response_box = mp_conn.mp_resp_box_id } in
Netcamlbox.camlbox_send
~prefer:!pref_slot ~slot:pref_slot
mp_conn.mp_req_box (ref (Meet_request req));
match Netcamlbox.camlbox_wait mp_conn.mp_resp_box with
| [ slot ] ->
let r =
(Netcamlbox.camlbox_get mp_conn.mp_resp_box slot).mate_opt in
(* No camlbox_get_copy of r! Instead, we make our own copy, which
is cheaper for very simple messages
*)
let r_copy =
match r with
| None ->
None
| Some ch ->
Some
{ ch with Chameneos_type.id = ch.Chameneos_type.id } in
Netcamlbox.camlbox_delete mp_conn.mp_resp_box slot;
r_copy
| _ ->
assert false
end
module Chameneos = struct
include Chameneos_type
let create =
let id = ref 0 in
let new_id () =
let r = !id in
id := r + 1;
r
in
fun color ->
{ id = new_id ();
color = color;
meetings = 0;
meetings_with_self = 0;
}
type run =
{ place_pid : Netmcore.process_id;
chameneos : t;
}
let run arg =
let ch = arg.chameneos in
let connector = Meeting_place.connect arg.place_pid in
let pref_slot = ref 0 in
(* The idea of pref_slot is to avoid cache bouncing: when the
same slot is reused by the same process, the memory of this
slot continues to be owned by the same cache
*)
let rec loop () =
match Meeting_place.meet pref_slot connector ch with
| None -> ()
| Some other ->
ch.meetings <- ch.meetings + 1;
if ch.id = other.id then
ch.meetings_with_self <- ch.meetings_with_self + 1;
ch.color <- Color.complement ch.color other.color;
loop ()
in
loop();
{ arg with chameneos = ch }
let fork_chameneos, join_chameneos =
Netmcore_process.def_process run
let start arg =
Netmcore_process.start fork_chameneos arg
let join pid =
match Netmcore_process.join join_chameneos pid with
| None ->
failwith "no result from chameneos"
| Some arg ->
arg
end
module Compute = struct
let work colors n =
Netlog.logf `Debug "Compute.work n=%d" n;
List.iter (fun c -> printf " %s" (Color.to_string c)) colors;
printf "\n";
let config =
{ Meeting_place.num_slots = 2 * List.length colors;
meetings = n
} in
let place_pid = Meeting_place.start config in
let `Process p = place_pid in
Netlog.logf `Debug "Compute.work place_pid=%d" p;
let cs = List.map Chameneos.create colors in
let cs_pids =
List.map
(fun ch ->
Chameneos.start
{ Chameneos.place_pid = place_pid; chameneos = ch }
)
cs in
Netlog.logf `Debug "Compute.work started chameneos processes";
let cs' =
List.map (fun pid -> Chameneos.join pid) cs_pids in
Netlog.logf `Debug "Compute.work joined chamaneos processes";
Meeting_place.shutdown place_pid;
Meeting_place.join place_pid;
Netlog.logf `Debug "Compute.work joined place";
let sum_meets = ref 0 in
List.iter
(fun res ->
let ch = res.Chameneos.chameneos in
printf "%d" ch.Chameneos.meetings;
spell_int ch.Chameneos.meetings_with_self;
printf "\n";
sum_meets := !sum_meets + ch.Chameneos.meetings
)
cs';
spell_int !sum_meets;
printf "\n%!"
let compute n =
work [ Color.Blue; Color.Red; Color.Yellow ] n;
printf "\n%!";
work [ Color.Blue; Color.Red; Color.Yellow; Color.Red; Color.Yellow;
Color.Blue; Color.Red; Color.Yellow; Color.Red; Color.Blue ] n;
printf "\n%!";
()
let fork_compute, join_compute =
Netmcore_process.def_process compute
let start n =
Netmcore_process.start fork_compute n
let join pid =
ignore(Netmcore_process.join join_compute pid)
end
let print_complements () =
List.iter
(fun c1 ->
List.iter
(fun c2 ->
printf "%s + %s -> %s\n"
(Color.to_string c1)
(Color.to_string c2)
(Color.to_string (Color.complement c1 c2))
)
Color.all
)
Color.all;
printf "\n%!";
;;
let main () =
(* Netmcore.Debug.enable := true; *)
let n =
try
int_of_string (Sys.argv.(1))
with
| _ -> 600
in
print_complements ();
(* Netmcore.Debug.enable := true; *)
List.iter
(fun sigcode ->
Netsys_signal.register_handler
~name:"application"
~signal:sigcode
~callback:(fun _ -> Netmcore.destroy_resources())
~keep_default:true
()
)
[ Sys.sigint; Sys.sigterm ];
Netmcore.startup
~socket_directory:"run_chameneos"
~init_ctrl:(fun ctrl ->
ctrl#add_plugin Netplex_sharedvar.plugin;
(* ctrl#controller_config#set_max_level `Debug *)
)
~first_process:(fun () ->
Netmcore_process.start
Compute.fork_compute n)
()
;;
let () = main ()
|