File: sort2_controller.ml

package info (click to toggle)
ocamlnet 4.1.9-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 54,024 kB
  • sloc: ml: 151,939; ansic: 11,071; sh: 2,003; makefile: 1,310
file content (413 lines) | stat: -rw-r--r-- 11,062 bytes parent folder | download | duplicates (8)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
(* Implementation of the controller process *)

open Sort2_proto_aux
open Printf

let worker_clients = Hashtbl.create 10
  (* Maps endpoint names to RPC clients *)


let rec get_worker_endpoints n_workers =
  (* Loop until at least n_workers are available *)
  let endpoints = 
    Netplex_cenv.lookup_container_sockets "sort_worker" "Worker" in
  if Array.length endpoints < n_workers then (
    Netlog.logf `Warning
      "Not enough workers found in registry (only %d)- will retry in 1 second"
      (Array.length endpoints);
    Unix.sleep 1;
    get_worker_endpoints n_workers
  )
  else (
    Netlog.logf `Info
      "Found %d endpoints, using the first %d" 
      (Array.length endpoints) n_workers;
    Array.sub endpoints 0 n_workers
  )


let get_worker_client esys endpoint =
  try
    Hashtbl.find worker_clients endpoint
  with
    | Not_found ->
        let connector =
          Netplex_sockserv.any_file_client_connector endpoint in
        let client =
          Sort2_proto_clnt.Worker.V1.create_client2
            ~esys
            (`Socket(Rpc.Tcp, connector, Rpc_client.default_socket_config)) in
        Hashtbl.replace worker_clients endpoint client;
        client


let get_subarray_info n_workers n =
  let slice_len = n / n_workers in
  let m = n mod n_workers in
  let k = ref 0 in
  Array.init
    n_workers
    (fun i ->
       let k0 = !k in
       let l =
	 if i < m then slice_len+1 else slice_len in
       k := !k + l;
       (k0, l)
    )


let shm_tbl = Hashtbl.create 10
  (* maps name to shm *)


let rec shm_open_excl pid n =
  try
    let name = sprintf "/sort2_%d_%d" pid n in
    let fd = 
      Netsys_posix.shm_open
	name
	[Netsys_posix.SHM_O_RDWR; 
	 Netsys_posix.SHM_O_CREAT;
	 Netsys_posix.SHM_O_EXCL]
	0o600 in
    (name,fd)
  with
    | Unix.Unix_error(Unix.EEXIST,_,_) ->
	shm_open_excl pid (n+1)


let shm_open_existing shm =
  let fd = 
    Netsys_posix.shm_open
      shm.shm_name
      [Netsys_posix.SHM_O_RDWR]
      0o600 in
  let mem = Netsys_mem.memory_map_file fd true (-1) in
  Unix.close fd;
  mem


let free_shm ?(immediate_unmap=false) shm =
  let (_,mem) =
    try
      Hashtbl.find shm_tbl shm.shm_name
    with
      | Not_found ->
	  failwith("Shared mem object not found: " ^ shm.shm_name) in
  Netsys_posix.shm_unlink shm.shm_name;
  if immediate_unmap then
    Netsys_mem.memory_unmap_file mem;   (* dangerous ! *)
  Hashtbl.remove shm_tbl shm.shm_name


let free_all() =
  let shm_list =
    Hashtbl.fold (fun _ (shm,_) acc -> shm::acc) shm_tbl [] in
  List.iter free_shm shm_list    


let dummy_mem =
  Netsys_mem.alloc_memory_pages 1


let sort_subarray esys worker_endpoint data (k,l) when_done when_error =
  (* The following is written as an asynchrounous sequence of RPC calls.
     On success, finally when_done is called. On error, when_error
     is called.
   *)

  let sdata = Array.sub data k l in   (* TODO: avoid this copy *)
  Netlog.logf `Info
    "sort_subarray k=%d l=%d" k l;
  
  (* First find out how large sdata is: *)
  let _,sdata_bytelen =
    Netsys_mem.init_value
      dummy_mem 0 sdata [Netsys_mem.Copy_simulate] in
  Netlog.logf `Info "first init_value done bytelen=%d" sdata_bytelen;
  
  (* Alloc the buffer in the controller: *)
  let ctrl_mem_name, ctrl_mem_fd =
    shm_open_excl (Unix.getpid()) 0 in
  let ctrl_mem =
    Netsys_mem.memory_map_file ctrl_mem_fd true sdata_bytelen in
  Unix.close ctrl_mem_fd;
  let ctrl_shm =
    { shm_name = ctrl_mem_name;
      shm_addr = Int64.of_nativeint(Netsys_mem.memory_address ctrl_mem)
    } in
  Hashtbl.replace shm_tbl ctrl_mem_name (ctrl_shm, ctrl_mem);
  Netlog.logf `Info "addr=%Lx" ctrl_shm.shm_addr;

  (* Alloc the buffer in the worker: *)
  let client = get_worker_client esys worker_endpoint in
  Sort2_proto_clnt.Worker.V1.alloc_shm'async
    client
    (Int64.of_int sdata_bytelen)
    (fun alloc_shm_reply ->
       try
	 let worker_shm = alloc_shm_reply() in
	 (* Got this buffer. Now copy the subarray to this buffer. *)
	 let worker_mem = shm_open_existing worker_shm in
	 let worker_offset,_ =
	   Netsys_mem.init_value
	     ~targetaddr:(Int64.to_nativeint worker_shm.shm_addr)
	     worker_mem 0 sdata [] in
	 Netlog.logf `Info
	   "Second init_value done";
	 (* Request the sort: *)
	 Sort2_proto_clnt.Worker.V1.sort_shm'async
	   client
	   (worker_shm,worker_offset)
	   (fun sort_shm_reply ->
	      try
		let () = sort_shm_reply() in   (* check for errors! *)
		(* Copy the sorted data to the controller buffer, so we can
                   access them
		 *)
		Sort2_proto_clnt.Worker.V1.copy_shm'async
		  client
		  (worker_shm,worker_offset,ctrl_shm)
		  (fun copy_shm_reply ->
		     try
		       Netlog.logf `Info "Got data in ctrl buffer";
		       let ctrl_offset = copy_shm_reply() in
		       (* Get the sorted subarray: *)
		       let sdata_sorted =
			 Netsys_mem.as_value ctrl_mem ctrl_offset in
		       Netsys_mem.value_area ctrl_mem;
		       when_done 
			 sdata_sorted
			 (fun () -> free_shm ~immediate_unmap:true ctrl_shm);
		       (* Do some cleanup. Note that we assume now that
                          we don't access sdata_sorted any longer! 
			*)
		       Sort2_proto_clnt.Worker.V1.free_shm'async
			 client
			 (worker_shm)
			 (fun _ -> ())
		     with
		       | error ->
			   when_error error
		  )
	      with
		| error ->
		    when_error error
	   )
       with
	 | error ->
	     when_error error
    )
  
exception Merge_exit
exception Merge_done_array of int


let rec merge_into_0 out k_out in_arrays k_in =
  let l_in = Array.length in_arrays in

  assert(l_in > 0);

  if l_in = 1 then (
    let a = !(in_arrays.(0)) in
    let l = Array.length a - k_in.(0) in
    Array.blit a k_in.(0) out !k_out l;
    k_out := !k_out + l
  )
  else
    merge_into_1 out k_out in_arrays k_in
  
and merge_into_1 out k_out in_arrays k_in =
  (* at least two in_arrays *)
  let l_in = Array.length in_arrays in
  let l_out = Array.length out in
  try
    while true do
      (* Find the smallest in_array element: *)
      let smallest_v = ref !(in_arrays.(0)).(k_in.(0)) in
      let smallest_p = ref 0 in
      for i = 1 to l_in-1 do
	let v = !(in_arrays.(i)).(k_in.(i)) in
	if v < !smallest_v then (
	  smallest_v := v;
	  smallest_p := i
	)
      done;
      out.( !k_out ) <- !smallest_v;
      incr k_out;
      if !k_out = l_out then raise Merge_exit;
      let k_new = k_in.( !smallest_p ) + 1 in
      k_in.( !smallest_p ) <- k_new;
      if k_new = Array.length !(in_arrays.( !smallest_p )) then
	raise (Merge_done_array !smallest_p)
    done
  with
    | Merge_exit -> 
	()
    | Merge_done_array p ->
	(* We are done with in_array p. Remove it from our structure *)
	let in_arrays' =
	  Array.init
	    (Array.length in_arrays - 1)
	    (fun i ->
	       if i < p then in_arrays.(i) else in_arrays.(i+1)
	    ) in
	let k_in' = 
	  Array.init
	    (Array.length k_in - 1)
	    (fun i ->
	       if i < p then k_in.(i) else k_in.(i+1)
	    ) in
	merge_into_0 out k_out in_arrays' k_in'


let merge_into out in_arrays =
  let t0 = Unix.gettimeofday() in
  Netlog.logf `Info "merge_into";
  let l_in = Array.length in_arrays in
  assert(l_in > 0);
  let k_out = ref 0 in
  let k_in = Array.make l_in 0 in
  merge_into_0 out k_out in_arrays k_in;
  assert (!k_out = Array.length out);
  let t1 = Unix.gettimeofday() in
  Netlog.logf `Info "time for merge_into: %f" (t1-.t0)


let sort esys data worker_endpoints when_done =
  (* sorts asynchronously. Calls [when_done] with [Some sorted_data] on
     success, or [when_done None] on error
   *)
  let n_workers = Array.length worker_endpoints in
  let subarray_info = 
    get_subarray_info n_workers (Array.length data) in
  let to_merge =
    Array.make n_workers (ref [| |]) in
    
  let runcounter = ref 0 in
  let errorflag = ref false in
    
  Array.iteri
    (fun p (k,l) ->
       let worker_endpoint = worker_endpoints.(p) in
       sort_subarray
	 esys worker_endpoint data (k,l) 
	 (fun sdata_sorted cleanup ->  (* when_done *)
	    decr runcounter;
	    if not !errorflag then (
	      (* Note that sdata_sorted is only valid memory until [cleanup]
                 is called. Be careful with that! We cannot attach a finaliser
                 to sdata_sorted directly (it is living outside of the heap)
                 but with some care the ref cell around it will work.

                 Note that in there is a ref to the bigarray in the cleanup
                 closure, and this ref prevents that the shm is unmapped.
	       *)
	      let sdata_sorted_cell = ref sdata_sorted in
	      to_merge.(p) <- sdata_sorted_cell;
	      Gc.finalise 
		(fun _ -> Netlog.logf `Info "Cleanup"; cleanup()) 
		sdata_sorted_cell;
	      (* If all subarrays have arrived, start the merge: *)
	      if !runcounter = 0 then (
		merge_into data to_merge;
		(* and reply: *)
		when_done (Some data)
	      )
	    );
	    if !errorflag && !runcounter = 0 then (
	      when_done None
	    )
	 )
	 (fun error ->         (* when_error *)
	    decr runcounter;
	    Netlog.logf `Err
	      "Got exception: %s" (Netexn.to_string error);
	    errorflag := true;
	    if !runcounter = 0 then (
	      when_done None
	    )
	 );
       incr runcounter
    )
    subarray_info


let emit_error session =
  Rpc_server.reply_error session Rpc.System_err


let proc_sort n_workers session (data,sort_flag) emit =
  if sort_flag then (
    let esys = (Netplex_cenv.self_cont()) # event_system in
    let worker_endpoints =
      get_worker_endpoints n_workers in
    sort 
      esys data worker_endpoints
      (function
	 | Some sorted_data ->
	     emit sorted_data
	 | None ->
	     emit_error session
      )
  )
  else
    emit data


let proc_get_workers n_workers session () emit =
  let worker_endpoints =
    get_worker_endpoints n_workers in
  emit worker_endpoints



let configure cf addr =
  let n_workers =
    try cf # int_param(cf # resolve_parameter addr "n_workers") 
    with Not_found -> 1 in
  Netlog.logf `Info
    "Using param n_workers=%d" n_workers;
  (n_workers)



let setup srv (n_workers) =
  Sort2_proto_srv.Interface.V1.bind_async
    ~proc_null:(fun _ _ emit -> emit ())
    ~proc_sort:(proc_sort n_workers)
    ~proc_get_workers:(proc_get_workers n_workers)
    srv

let controller_factory() =
  Rpc_netplex.rpc_factory
    ~name:"sort"
    ~configure
    ~setup
    ~hooks:(fun _ ->
              object(self)
                inherit Netplex_kit.empty_processor_hooks() 
                method post_start_hook _ =
                  let _t =
                    Netplex_cenv.create_timer
                      (fun _ -> 
                         Gc.major();
                         true
                      )
                      1.0 in
		  List.iter
		    (fun signo ->
		       Netsys_signal.register_handler
			 ~name:"Sort2_controller"
			 ~signal:signo
			 ~callback:(fun _ -> free_all())
			 ~keep_default:true
			 ()
		    )
		    [ Sys.sigint; Sys.sigterm ];
                  ()
		method pre_finish_hook _ =
		  free_all()
              end
           )
    ()