File: chameneos.ml

package info (click to toggle)
ocamlnet 4.1.9-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 54,024 kB
  • sloc: ml: 151,939; ansic: 11,071; sh: 2,003; makefile: 1,310
file content (418 lines) | stat: -rw-r--r-- 10,780 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
(* $Id$ *)

(* Chameneos game implemented with message passing

   see
   http://cedric.cnam.fr/PUBLIS/RC474.pdf
   http://shootout.alioth.debian.org/u32/benchmark.php?test=chameneosredux&lang=all

   This solution does not claim to be fast in any way. It is just a
   coding example. In particular, message passing is not optimal here
   because the messages are very short (and thus the whole message
   copying machinery consumes a lot of setup time relative to total
   time), and the worker processes do almost nothing (Color.complement).
   So this program tests mostly the bare minimum time needed for
   synchronization, but for any real-world example it will be much better.

 *)

open Printf

let spell_int i = 
  let spell_char = function 
    | '0' -> "zero"
    | '1' -> "one"
    | '2' -> "two"
    | '3' -> "three"
    | '4' -> "four"
    | '5' -> "five"
    | '6' -> "six"
    | '7' -> "seven"
    | '8' -> "eight"
    | '9' -> "nine"
    | x -> failwith "unexpected char"
  in
  let s = string_of_int i in
  String.iter (fun c -> printf " %s" (spell_char c)) s;


module Color = struct 
  type t =
  | Blue
  | Red
  | Yellow

  let complement t t' =
  match t, t' with 
    | Blue, Blue -> Blue
    | Blue, Red -> Yellow
    | Blue, Yellow -> Red
    | Red, Blue -> Yellow
    | Red, Red -> Red
    | Red, Yellow -> Blue
    | Yellow, Blue -> Red
    | Yellow, Red -> Blue
    | Yellow, Yellow -> Yellow

  let to_string = function
    | Blue -> "blue"
    | Red -> "red"
    | Yellow -> "yellow"

  let all = [ Blue; Red; Yellow ]
end


module Chameneos_type = struct
  type t = {
    id : int;
    mutable color : Color.t;
    mutable meetings : int;
    mutable meetings_with_self : int;
  }
end


module Meeting_place = struct
  (* In this solution the meeting place is an independent process to
     which messages are sent (i.e. no "monitor" as suggested by the
     original article)
   *)

  module Id_var =
    Netplex_sharedvar.Make_var_type(struct type t = Netmcore.res_id end)

  let get_box_id_var (`Process pid) =
    sprintf "Meeting_place.box.%d" pid 

  let set_box_id pid box_id =
    let box_id_var = get_box_id_var pid in
    ignore(Netplex_sharedvar.create_var ~enc:true box_id_var);
    Id_var.set box_id_var box_id

  let get_box_id pid =
    let box_id_var = get_box_id_var pid in
    ignore(Netplex_sharedvar.create_var ~enc:true box_id_var);
    ignore(Netplex_sharedvar.wait_for_enc_value box_id_var);
    Id_var.get box_id_var
    
  type config =
      { num_slots : int;   (* #slots of the box *)
	meetings : int;
      }

  type meet_request =
      { me : Chameneos_type.t;
	response_box : Netmcore.res_id;
      }

  type request =
    | Meet_request of meet_request
    | Shutdown

  type response =
      { mate_opt : Chameneos_type.t option }
	(* None = all meetings done *)
  
  type state =
    | Empty
    | First of int * meet_request
	(* slot, request *)
	
  let meeting_place config =
    let (box : request ref Netcamlbox.camlbox), box_id =
      Netmcore_camlbox.create_camlbox "chameneos" config.num_slots 512 in
    (* 512: just an upper limit for the message size *)

    (* Put the ID of the box into a global variable, so all workers
       can get it from there
     *)
    let pid = Netmcore.self_process_id() in
    set_box_id pid box_id;

    let response_boxes = Hashtbl.create 29 in
    let get_response_box id =
      try
	Hashtbl.find response_boxes id 
      with Not_found ->
	let b = Netmcore_camlbox.lookup_camlbox_sender (`Resource id) in
	Hashtbl.add response_boxes id b;
	(b : response Netcamlbox.camlbox_sender)
    in

    let meetings_left = ref config.meetings in
    let state = ref Empty in
    let live = ref true in
    while !live do
      let req_slots = Netcamlbox.camlbox_wait box in
      List.iter
	(fun req_slot ->
	   let req = !(Netcamlbox.camlbox_get box req_slot) in
	   (* no copy! So be careful... *)
	   match req with
	     | Meet_request mreq ->
		 if !meetings_left > 0 then (
		   match !state with
		     | Empty ->
			 state := First(req_slot,mreq)
		     | First(first_slot,first_req) ->
			 let `Resource r1 = first_req.response_box  in
			 let `Resource r2 = mreq.response_box in
			 let fst_box = 
			   get_response_box r1 in
			 Netcamlbox.camlbox_send 
			   fst_box { mate_opt = Some mreq.me };
			 let snd_box = 
			   get_response_box r2 in
			 Netcamlbox.camlbox_send 
			   snd_box { mate_opt = Some first_req.me };
			 decr meetings_left;
			 state := Empty;
			 Netcamlbox.camlbox_delete box first_slot;
			 Netcamlbox.camlbox_delete box req_slot
		 )
		 else (
		   let `Resource r = mreq.response_box in
		   let r_box = get_response_box r in
		   Netcamlbox.camlbox_delete box req_slot;
		   Netcamlbox.camlbox_send r_box { mate_opt = None };
		 )
	     | Shutdown ->
		 Netlog.logf `Debug "Got shutdown request";
		 Netcamlbox.camlbox_delete box req_slot;
		 live := false
	)
	req_slots
    done;
    let box_id_var = get_box_id_var pid in
    ignore(Netplex_sharedvar.delete_var box_id_var)


  let fork_meeting_place, join_meeting_place =
    Netmcore_process.def_process meeting_place

  let start config =
    Netmcore_process.start fork_meeting_place config

  let join pid =
    ignore(Netmcore_process.join join_meeting_place pid)

  let shutdown pid =
    let req_box_id = get_box_id pid in
    let b = Netmcore_camlbox.lookup_camlbox_sender req_box_id in
    Netcamlbox.camlbox_send b (ref Shutdown)

  type connector =
      { mp_req_box : request ref Netcamlbox.camlbox_sender;
	mp_resp_box : response Netcamlbox.camlbox;
	mp_resp_box_id : Netmcore.res_id;
      }

  let connect pid =
    let req_box_id = get_box_id pid in
    let (r_box : response Netcamlbox.camlbox), r_box_id =
      Netmcore_camlbox.create_camlbox "chameneos" 2 512 in
    { mp_req_box = Netmcore_camlbox.lookup_camlbox_sender req_box_id;
      mp_resp_box = r_box;
      mp_resp_box_id = r_box_id;
    }
    
  let meet pref_slot mp_conn ch =
    let req = { me = ch; response_box = mp_conn.mp_resp_box_id } in
    Netcamlbox.camlbox_send
      ~prefer:!pref_slot ~slot:pref_slot
      mp_conn.mp_req_box (ref (Meet_request req));
    match Netcamlbox.camlbox_wait mp_conn.mp_resp_box with
      | [ slot ] ->
	  let r =
	    (Netcamlbox.camlbox_get mp_conn.mp_resp_box slot).mate_opt in
	  (* No camlbox_get_copy of r! Instead, we make our own copy, which
	     is cheaper for very simple messages
	   *)
	  let r_copy =
	    match r with
	      | None -> 
		  None
	      | Some ch -> 
		  Some 
		    { ch with Chameneos_type.id = ch.Chameneos_type.id } in
	  Netcamlbox.camlbox_delete mp_conn.mp_resp_box slot;
	  r_copy
      | _ ->
	  assert false
end


module Chameneos = struct 
  include Chameneos_type

  let create = 
    let id = ref 0 in
    let new_id () = 
      let r = !id in
      id := r + 1;
      r
    in
    fun color -> 
      { id = new_id ();
	color = color;
	meetings = 0;
	meetings_with_self = 0;
      }
	
  type run =
      { place_pid : Netmcore.process_id;
	chameneos : t;
      }
	
  let run arg =
    let ch = arg.chameneos in
    let connector = Meeting_place.connect arg.place_pid in
    let pref_slot = ref 0 in
    (* The idea of pref_slot is to avoid cache bouncing: when the
       same slot is reused by the same process, the memory of this
       slot continues to be owned by the same cache
     *)

    let rec loop () =
      match Meeting_place.meet pref_slot connector ch with 
	| None -> ()
	| Some other -> 
	    ch.meetings <- ch.meetings + 1;
	    if ch.id = other.id then
	      ch.meetings_with_self <- ch.meetings_with_self + 1;
	    ch.color <- Color.complement ch.color other.color;
	    loop () 
    in
    loop();
    { arg with chameneos = ch }
      
  let fork_chameneos, join_chameneos =
    Netmcore_process.def_process run
      
  let start arg =
    Netmcore_process.start fork_chameneos arg
	
  let join pid =
    match Netmcore_process.join join_chameneos pid with
      | None ->
	  failwith "no result from chameneos"
      | Some arg ->
	  arg
end


module Compute = struct
  let work colors n = 
    Netlog.logf `Debug "Compute.work n=%d" n;

    List.iter (fun c -> printf " %s" (Color.to_string c)) colors;
    printf "\n";

    let config =
      { Meeting_place.num_slots = 2 * List.length colors;
	meetings = n 
      } in
    let place_pid = Meeting_place.start config in
    let `Process p = place_pid in
    Netlog.logf `Debug "Compute.work place_pid=%d" p;
    let cs = List.map Chameneos.create colors in
    let cs_pids = 
      List.map 
	(fun ch -> 
	   Chameneos.start
	     { Chameneos.place_pid = place_pid; chameneos = ch }
	) 
	cs in
    Netlog.logf `Debug "Compute.work started chameneos processes";
    let cs' =
      List.map (fun pid -> Chameneos.join pid) cs_pids in
    Netlog.logf `Debug "Compute.work joined chamaneos processes";
    Meeting_place.shutdown place_pid;
    Meeting_place.join place_pid;
    Netlog.logf `Debug "Compute.work joined place";

    let sum_meets = ref 0 in
    List.iter 
      (fun res ->
	 let ch = res.Chameneos.chameneos in
	 printf "%d" ch.Chameneos.meetings; 
	 spell_int ch.Chameneos.meetings_with_self;
	 printf "\n";
	 sum_meets := !sum_meets + ch.Chameneos.meetings
      )
      cs';
    spell_int !sum_meets; 
    printf "\n%!"


  let compute n =
    work [ Color.Blue; Color.Red; Color.Yellow ] n;
    printf "\n%!";
    work [ Color.Blue; Color.Red; Color.Yellow; Color.Red; Color.Yellow;
	   Color.Blue; Color.Red; Color.Yellow; Color.Red; Color.Blue ] n;
    printf "\n%!";
    
    ()

  let fork_compute, join_compute =
    Netmcore_process.def_process compute

  let start n =
    Netmcore_process.start fork_compute n
	
  let join pid =
    ignore(Netmcore_process.join join_compute pid)

end


let print_complements () = 
  List.iter 
    (fun c1 -> 
       List.iter 
	 (fun c2 ->
	    printf "%s + %s -> %s\n" 
	      (Color.to_string c1)
	      (Color.to_string c2)
	      (Color.to_string (Color.complement c1 c2))
	 )
	 Color.all
    )
    Color.all;
  printf "\n%!";
;;

let main () = 
  (* Netmcore.Debug.enable := true; *)
  let n = 
    try 
      int_of_string (Sys.argv.(1))
    with
    | _ -> 600
  in
  print_complements ();
  (* Netmcore.Debug.enable := true; *)
  List.iter
    (fun sigcode ->
       Netsys_signal.register_handler 
	 ~name:"application"
	 ~signal:sigcode 
	 ~callback:(fun _ -> Netmcore.destroy_resources())
	 ~keep_default:true
	 ()
    )
    [ Sys.sigint; Sys.sigterm ];
  Netmcore.startup
    ~socket_directory:"run_chameneos"
    ~init_ctrl:(fun ctrl -> 
		  ctrl#add_plugin Netplex_sharedvar.plugin;
		  (* ctrl#controller_config#set_max_level `Debug *)
	       )
    ~first_process:(fun () ->
		      Netmcore_process.start
			Compute.fork_compute n)
    ()
;;

let () = main ()