File: mm_controller.ml

package info (click to toggle)
ocamlnet 4.1.9-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 54,024 kB
  • sloc: ml: 151,939; ansic: 11,071; sh: 2,003; makefile: 1,310
file content (201 lines) | stat: -rw-r--r-- 4,724 bytes parent folder | download | duplicates (8)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
(* Implementation of the controller/multiplier *)

open Mm_proto_aux

(* The state is kept in global variables for simplicity. This means we can
   only do one multiplication at a time. 
 *)
let current_matrices = ref None
let current_job_queue = ref None
let current_result = ref None


let with_matrix f =
  match !current_matrices with
    | None ->
	failwith "No matrix"
    | Some(l,r) ->
	f l r

let proc_get_dim which =
  with_matrix
    (fun l r ->
       let m =
	 if which = left then l else r in
       let rows = Array.length m in
       let cols = if rows > 0 then Array.length m.(0) else 0 in
       { rows = rows;
	 columns = cols
       }
    )

let proc_get_row (which,row) =
  with_matrix
    (fun l r ->
       let m =
	 if which = left then l else r in
       m.(row)
    )

let incr_queue (lc, lc_max, rr, rr_max) =
  let lc' = lc+1 in
  if lc' = lc_max then (
    let rr' = rr+1 in
    if rr' = rr_max then
      None
    else
      Some(0, lc_max, rr', rr_max)
  )
  else
    Some (lc', lc_max, rr, rr_max)


let rec pull n queue_opt =
  match queue_opt with
    | None ->
	[]
    | Some(lc, lc_max, rr, rr_max) ->
	(* The "queue" is actually represented as two counters lc and rr *)
	if n > 0 then (
	  let j = { left_col = lc; right_row = rr } in
	  let queue' = incr_queue (lc, lc_max, rr, rr_max) in
	  current_job_queue := queue';
	  j :: pull (n-1) queue'
	)
	else
	  []


let proc_pull_jobs n =
  let jobs = pull n !current_job_queue in
  (Array.of_list jobs)
	  

let proc_put_results results =
  match !current_result with
    | None -> 
	()
    | Some m ->
	Array.iter
	  (fun r ->
	     m.( r.res_job.right_row ).( r.res_job.left_col ) <- r.res_val
	  )
	  results;
	()


let fill m rows cols =
  for j = 0 to rows-1 do
    for k = 0 to cols-1 do
      m.(j).(k) <- Random.float 1.0
    done
  done
    


let proc_test_multiply workers _ (lrows,rcols,rrows) emit =
  (* This is an asynchronous RPC implmentation. This means we don't have to
     reply the result immediately. Instead we get an [emit] function, and
     we can call this function at some time in the future to pass the result
     value back to the caller of the RPC. 
   *)
  let lcols = rrows in
  let lmatrix = Array.make_matrix lrows lcols 0.0 in
  let rmatrix = Array.make_matrix rrows rcols 0.0 in
  fill lmatrix lrows lcols;
  fill rmatrix rrows rcols;
  current_matrices := Some(lmatrix,rmatrix);
  current_result := Some(Array.make_matrix rrows lcols 0.0);
  current_job_queue := Some(0, lcols, 0, rrows);
  
  (* Now start the computations by telling all workers to go: *)
  let n = ref 0 in
  let esys = (Netplex_cenv.self_cont()) # event_system in
  let worker_clients = ref [] in
  List.iter
    (fun (host,port) ->
       let worker =
	 Mm_proto_clnt.Worker.V1.create_client2
	   ~esys
	   (`Socket(Rpc.Tcp,
		    Rpc_client.Inet(host,port),
		    Rpc_client.default_socket_config)) in
       worker_clients := worker :: !worker_clients;
       Mm_proto_clnt.Worker.V1.run'async
	 worker
	 ()
	 (fun get_result ->
	    (* This function is called back when the worker passes a result
               back for "run"
	     *)
	    decr n;
	    ( try let () = get_result() in () (* check for exceptions *)
	      with error ->
		Netplex_cenv.logf `Err "Error from worker: %s"
		  (Printexc.to_string error)
	    );
	    if !n=0 then (
	      (* All workers done! *)
	      assert(!current_job_queue = None);
	      (* Delete the result: *)
	      current_matrices := None;
	      current_result := None;
	      emit ();
	      List.iter Rpc_client.shut_down !worker_clients
	    )
	 );
       incr n
    )
    workers



let configure cf addr =
  let workers_sect =
    cf # resolve_section addr "worker" in
  let workers =
    List.map
      (fun w_addr ->
	 let host =
	   try cf # string_param(cf # resolve_parameter w_addr "host")
	   with Not_found ->
	     failwith "Required param host is missing" in
	 let port =
	   try cf # int_param(cf # resolve_parameter w_addr "port")
	   with Not_found ->
	     failwith "Required param port is missing" in
	 (host,port)
      )
      workers_sect
  in
  workers
  

let setup srv workers =
  Mm_proto_srv.Multiplier.V1.bind_async
    ~proc_ping:(fun _ () emit -> emit())
    ~proc_test_multiply:(proc_test_multiply workers)
    srv;
  Mm_proto_srv.Controller.V1.bind
    ~proc_ping:(fun () -> ())
    ~proc_get_dim
    ~proc_get_row
    ~proc_pull_jobs
    ~proc_put_results
    srv


let controller_factory() =
  Rpc_netplex.rpc_factory
    ~configure
    ~name:"mm_controller"
    ~setup
    ~hooks:(fun _ ->
	      ( object(self)
		  inherit Netplex_kit.empty_processor_hooks()
		  method post_start_hook _ =
		    Random.self_init()
		end
	      )
	   )
    ()