1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
|
(* Implementation of the controller/multiplier *)
open Mm_proto_aux
(* The state is kept in global variables for simplicity. This means we can
only do one multiplication at a time.
*)
let current_matrices = ref None
let current_job_queue = ref None
let current_result = ref None
let with_matrix f =
match !current_matrices with
| None ->
failwith "No matrix"
| Some(l,r) ->
f l r
let proc_get_dim which =
with_matrix
(fun l r ->
let m =
if which = left then l else r in
let rows = Array.length m in
let cols = if rows > 0 then Array.length m.(0) else 0 in
{ rows = rows;
columns = cols
}
)
let proc_get_row (which,row) =
with_matrix
(fun l r ->
let m =
if which = left then l else r in
m.(row)
)
let incr_queue (lc, lc_max, rr, rr_max) =
let lc' = lc+1 in
if lc' = lc_max then (
let rr' = rr+1 in
if rr' = rr_max then
None
else
Some(0, lc_max, rr', rr_max)
)
else
Some (lc', lc_max, rr, rr_max)
let rec pull n queue_opt =
match queue_opt with
| None ->
[]
| Some(lc, lc_max, rr, rr_max) ->
(* The "queue" is actually represented as two counters lc and rr *)
if n > 0 then (
let j = { left_col = lc; right_row = rr } in
let queue' = incr_queue (lc, lc_max, rr, rr_max) in
current_job_queue := queue';
j :: pull (n-1) queue'
)
else
[]
let proc_pull_jobs n =
let jobs = pull n !current_job_queue in
(Array.of_list jobs)
let proc_put_results results =
match !current_result with
| None ->
()
| Some m ->
Array.iter
(fun r ->
m.( r.res_job.right_row ).( r.res_job.left_col ) <- r.res_val
)
results;
()
let fill m rows cols =
for j = 0 to rows-1 do
for k = 0 to cols-1 do
m.(j).(k) <- Random.float 1.0
done
done
let proc_test_multiply workers _ (lrows,rcols,rrows) emit =
(* This is an asynchronous RPC implmentation. This means we don't have to
reply the result immediately. Instead we get an [emit] function, and
we can call this function at some time in the future to pass the result
value back to the caller of the RPC.
*)
let lcols = rrows in
let lmatrix = Array.make_matrix lrows lcols 0.0 in
let rmatrix = Array.make_matrix rrows rcols 0.0 in
fill lmatrix lrows lcols;
fill rmatrix rrows rcols;
current_matrices := Some(lmatrix,rmatrix);
current_result := Some(Array.make_matrix rrows lcols 0.0);
current_job_queue := Some(0, lcols, 0, rrows);
(* Now start the computations by telling all workers to go: *)
let n = ref 0 in
let esys = (Netplex_cenv.self_cont()) # event_system in
let worker_clients = ref [] in
List.iter
(fun (host,port) ->
let worker =
Mm_proto_clnt.Worker.V1.create_client2
~esys
(`Socket(Rpc.Tcp,
Rpc_client.Inet(host,port),
Rpc_client.default_socket_config)) in
worker_clients := worker :: !worker_clients;
Mm_proto_clnt.Worker.V1.run'async
worker
()
(fun get_result ->
(* This function is called back when the worker passes a result
back for "run"
*)
decr n;
( try let () = get_result() in () (* check for exceptions *)
with error ->
Netplex_cenv.logf `Err "Error from worker: %s"
(Printexc.to_string error)
);
if !n=0 then (
(* All workers done! *)
assert(!current_job_queue = None);
(* Delete the result: *)
current_matrices := None;
current_result := None;
emit ();
List.iter Rpc_client.shut_down !worker_clients
)
);
incr n
)
workers
let configure cf addr =
let workers_sect =
cf # resolve_section addr "worker" in
let workers =
List.map
(fun w_addr ->
let host =
try cf # string_param(cf # resolve_parameter w_addr "host")
with Not_found ->
failwith "Required param host is missing" in
let port =
try cf # int_param(cf # resolve_parameter w_addr "port")
with Not_found ->
failwith "Required param port is missing" in
(host,port)
)
workers_sect
in
workers
let setup srv workers =
Mm_proto_srv.Multiplier.V1.bind_async
~proc_ping:(fun _ () emit -> emit())
~proc_test_multiply:(proc_test_multiply workers)
srv;
Mm_proto_srv.Controller.V1.bind
~proc_ping:(fun () -> ())
~proc_get_dim
~proc_get_row
~proc_pull_jobs
~proc_put_results
srv
let controller_factory() =
Rpc_netplex.rpc_factory
~configure
~name:"mm_controller"
~setup
~hooks:(fun _ ->
( object(self)
inherit Netplex_kit.empty_processor_hooks()
method post_start_hook _ =
Random.self_init()
end
)
)
()
|