1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418
|
(* $Id$ *)
(* Chameneos game implemented with message passing
see
http://cedric.cnam.fr/PUBLIS/RC474.pdf
http://shootout.alioth.debian.org/u32/benchmark.php?test=chameneosredux&lang=all
This solution does not claim to be fast in any way. It is just a
coding example. In particular, message passing is not optimal here
because the messages are very short (and thus the whole message
copying machinery consumes a lot of setup time relative to total
time), and the worker processes do almost nothing (Color.complement).
So this program tests mostly the bare minimum time needed for
synchronization, but for any real-world example it will be much better.
*)
open Printf
let spell_int i =
let spell_char = function
| '0' -> "zero"
| '1' -> "one"
| '2' -> "two"
| '3' -> "three"
| '4' -> "four"
| '5' -> "five"
| '6' -> "six"
| '7' -> "seven"
| '8' -> "eight"
| '9' -> "nine"
| x -> failwith "unexpected char"
in
let s = string_of_int i in
String.iter (fun c -> printf " %s" (spell_char c)) s;
module Color = struct
type t =
| Blue
| Red
| Yellow
let complement t t' =
match t, t' with
| Blue, Blue -> Blue
| Blue, Red -> Yellow
| Blue, Yellow -> Red
| Red, Blue -> Yellow
| Red, Red -> Red
| Red, Yellow -> Blue
| Yellow, Blue -> Red
| Yellow, Red -> Blue
| Yellow, Yellow -> Yellow
let to_string = function
| Blue -> "blue"
| Red -> "red"
| Yellow -> "yellow"
let all = [ Blue; Red; Yellow ]
end
module Chameneos_type = struct
type t = {
id : int;
mutable color : Color.t;
mutable meetings : int;
mutable meetings_with_self : int;
}
end
module Meeting_place = struct
(* In this solution the meeting place is an independent process to
which messages are sent (i.e. no "monitor" as suggested by the
original article)
*)
module Id_var =
Netplex_sharedvar.Make_var_type(struct type t = Netmcore.res_id end)
let get_box_id_var (`Process pid) =
sprintf "Meeting_place.box.%d" pid
let set_box_id pid box_id =
let box_id_var = get_box_id_var pid in
ignore(Netplex_sharedvar.create_var ~enc:true box_id_var);
Id_var.set box_id_var box_id
let get_box_id pid =
let box_id_var = get_box_id_var pid in
ignore(Netplex_sharedvar.create_var ~enc:true box_id_var);
ignore(Netplex_sharedvar.wait_for_enc_value box_id_var);
Id_var.get box_id_var
type config =
{ num_slots : int; (* #slots of the box *)
meetings : int;
}
type meet_request =
{ me : Chameneos_type.t;
response_box : Netmcore.res_id;
}
type request =
| Meet_request of meet_request
| Shutdown
type response =
{ mate_opt : Chameneos_type.t option }
(* None = all meetings done *)
type state =
| Empty
| First of int * meet_request
(* slot, request *)
let meeting_place config =
let (box : request ref Netcamlbox.camlbox), box_id =
Netmcore_camlbox.create_camlbox "chameneos" config.num_slots 512 in
(* 512: just an upper limit for the message size *)
(* Put the ID of the box into a global variable, so all workers
can get it from there
*)
let pid = Netmcore.self_process_id() in
set_box_id pid box_id;
let response_boxes = Hashtbl.create 29 in
let get_response_box id =
try
Hashtbl.find response_boxes id
with Not_found ->
let b = Netmcore_camlbox.lookup_camlbox_sender (`Resource id) in
Hashtbl.add response_boxes id b;
(b : response Netcamlbox.camlbox_sender)
in
let meetings_left = ref config.meetings in
let state = ref Empty in
let live = ref true in
while !live do
let req_slots = Netcamlbox.camlbox_wait box in
List.iter
(fun req_slot ->
let req = !(Netcamlbox.camlbox_get box req_slot) in
(* no copy! So be careful... *)
match req with
| Meet_request mreq ->
if !meetings_left > 0 then (
match !state with
| Empty ->
state := First(req_slot,mreq)
| First(first_slot,first_req) ->
let `Resource r1 = first_req.response_box in
let `Resource r2 = mreq.response_box in
let fst_box =
get_response_box r1 in
Netcamlbox.camlbox_send
fst_box { mate_opt = Some mreq.me };
let snd_box =
get_response_box r2 in
Netcamlbox.camlbox_send
snd_box { mate_opt = Some first_req.me };
decr meetings_left;
state := Empty;
Netcamlbox.camlbox_delete box first_slot;
Netcamlbox.camlbox_delete box req_slot
)
else (
let `Resource r = mreq.response_box in
let r_box = get_response_box r in
Netcamlbox.camlbox_delete box req_slot;
Netcamlbox.camlbox_send r_box { mate_opt = None };
)
| Shutdown ->
Netlog.logf `Debug "Got shutdown request";
Netcamlbox.camlbox_delete box req_slot;
live := false
)
req_slots
done;
let box_id_var = get_box_id_var pid in
ignore(Netplex_sharedvar.delete_var box_id_var)
let fork_meeting_place, join_meeting_place =
Netmcore_process.def_process meeting_place
let start config =
Netmcore_process.start fork_meeting_place config
let join pid =
ignore(Netmcore_process.join join_meeting_place pid)
let shutdown pid =
let req_box_id = get_box_id pid in
let b = Netmcore_camlbox.lookup_camlbox_sender req_box_id in
Netcamlbox.camlbox_send b (ref Shutdown)
type connector =
{ mp_req_box : request ref Netcamlbox.camlbox_sender;
mp_resp_box : response Netcamlbox.camlbox;
mp_resp_box_id : Netmcore.res_id;
}
let connect pid =
let req_box_id = get_box_id pid in
let (r_box : response Netcamlbox.camlbox), r_box_id =
Netmcore_camlbox.create_camlbox "chameneos" 2 512 in
{ mp_req_box = Netmcore_camlbox.lookup_camlbox_sender req_box_id;
mp_resp_box = r_box;
mp_resp_box_id = r_box_id;
}
let meet pref_slot mp_conn ch =
let req = { me = ch; response_box = mp_conn.mp_resp_box_id } in
Netcamlbox.camlbox_send
~prefer:!pref_slot ~slot:pref_slot
mp_conn.mp_req_box (ref (Meet_request req));
match Netcamlbox.camlbox_wait mp_conn.mp_resp_box with
| [ slot ] ->
let r =
(Netcamlbox.camlbox_get mp_conn.mp_resp_box slot).mate_opt in
(* No camlbox_get_copy of r! Instead, we make our own copy, which
is cheaper for very simple messages
*)
let r_copy =
match r with
| None ->
None
| Some ch ->
Some
{ ch with Chameneos_type.id = ch.Chameneos_type.id } in
Netcamlbox.camlbox_delete mp_conn.mp_resp_box slot;
r_copy
| _ ->
assert false
end
module Chameneos = struct
include Chameneos_type
let create =
let id = ref 0 in
let new_id () =
let r = !id in
id := r + 1;
r
in
fun color ->
{ id = new_id ();
color = color;
meetings = 0;
meetings_with_self = 0;
}
type run =
{ place_pid : Netmcore.process_id;
chameneos : t;
}
let run arg =
let ch = arg.chameneos in
let connector = Meeting_place.connect arg.place_pid in
let pref_slot = ref 0 in
(* The idea of pref_slot is to avoid cache bouncing: when the
same slot is reused by the same process, the memory of this
slot continues to be owned by the same cache
*)
let rec loop () =
match Meeting_place.meet pref_slot connector ch with
| None -> ()
| Some other ->
ch.meetings <- ch.meetings + 1;
if ch.id = other.id then
ch.meetings_with_self <- ch.meetings_with_self + 1;
ch.color <- Color.complement ch.color other.color;
loop ()
in
loop();
{ arg with chameneos = ch }
let fork_chameneos, join_chameneos =
Netmcore_process.def_process run
let start arg =
Netmcore_process.start fork_chameneos arg
let join pid =
match Netmcore_process.join join_chameneos pid with
| None ->
failwith "no result from chameneos"
| Some arg ->
arg
end
module Compute = struct
let work colors n =
Netlog.logf `Debug "Compute.work n=%d" n;
List.iter (fun c -> printf " %s" (Color.to_string c)) colors;
printf "\n";
let config =
{ Meeting_place.num_slots = 2 * List.length colors;
meetings = n
} in
let place_pid = Meeting_place.start config in
let `Process p = place_pid in
Netlog.logf `Debug "Compute.work place_pid=%d" p;
let cs = List.map Chameneos.create colors in
let cs_pids =
List.map
(fun ch ->
Chameneos.start
{ Chameneos.place_pid = place_pid; chameneos = ch }
)
cs in
Netlog.logf `Debug "Compute.work started chameneos processes";
let cs' =
List.map (fun pid -> Chameneos.join pid) cs_pids in
Netlog.logf `Debug "Compute.work joined chamaneos processes";
Meeting_place.shutdown place_pid;
Meeting_place.join place_pid;
Netlog.logf `Debug "Compute.work joined place";
let sum_meets = ref 0 in
List.iter
(fun res ->
let ch = res.Chameneos.chameneos in
printf "%d" ch.Chameneos.meetings;
spell_int ch.Chameneos.meetings_with_self;
printf "\n";
sum_meets := !sum_meets + ch.Chameneos.meetings
)
cs';
spell_int !sum_meets;
printf "\n%!"
let compute n =
work [ Color.Blue; Color.Red; Color.Yellow ] n;
printf "\n%!";
work [ Color.Blue; Color.Red; Color.Yellow; Color.Red; Color.Yellow;
Color.Blue; Color.Red; Color.Yellow; Color.Red; Color.Blue ] n;
printf "\n%!";
()
let fork_compute, join_compute =
Netmcore_process.def_process compute
let start n =
Netmcore_process.start fork_compute n
let join pid =
ignore(Netmcore_process.join join_compute pid)
end
let print_complements () =
List.iter
(fun c1 ->
List.iter
(fun c2 ->
printf "%s + %s -> %s\n"
(Color.to_string c1)
(Color.to_string c2)
(Color.to_string (Color.complement c1 c2))
)
Color.all
)
Color.all;
printf "\n%!";
;;
let main () =
(* Netmcore.Debug.enable := true; *)
let n =
try
int_of_string (Sys.argv.(1))
with
| _ -> 600
in
print_complements ();
(* Netmcore.Debug.enable := true; *)
List.iter
(fun sigcode ->
Netsys_signal.register_handler
~name:"application"
~signal:sigcode
~callback:(fun _ -> Netmcore.destroy_resources())
~keep_default:true
()
)
[ Sys.sigint; Sys.sigterm ];
Netmcore.startup
~socket_directory:"run_chameneos"
~init_ctrl:(fun ctrl ->
ctrl#add_plugin Netplex_sharedvar.plugin;
(* ctrl#controller_config#set_max_level `Debug *)
)
~first_process:(fun () ->
Netmcore_process.start
Compute.fork_compute n)
()
;;
let () = main ()
|