File: mm_worker.ml

package info (click to toggle)
ocamlnet 4.1.9-8
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 54,040 kB
  • sloc: ml: 151,939; ansic: 11,071; sh: 2,003; makefile: 1,310
file content (86 lines) | stat: -rw-r--r-- 2,290 bytes parent folder | download | duplicates (8)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
(* Implementation of the worker *)

open Mm_proto_aux


let jobs_at_once = 100


let proc_run (controller_host, controller_port) () =
  let controller =
    Mm_proto_clnt.Controller.V1.create_client2
      (`Socket(Rpc.Tcp,
	       Rpc_client.Inet(controller_host,controller_port),
	       Rpc_client.default_socket_config)) in

  (* Get the dimension of the matrices, and retrieve them from the ctrl:*)
  let ldim =
    Mm_proto_clnt.Controller.V1.get_dim controller left in
  let rdim =
    Mm_proto_clnt.Controller.V1.get_dim controller right in
  assert(ldim.rows = rdim.columns);

  let lmatrix =
    Array.make ldim.rows [| |] in
  for j = 0 to ldim.rows-1 do
    lmatrix.(j) <- Mm_proto_clnt.Controller.V1.get_row controller (left,j)
  done;
  let rmatrix =
    Array.make rdim.rows [| |] in
  for j = 0 to ldim.rows-1 do
    rmatrix.(j) <- Mm_proto_clnt.Controller.V1.get_row controller (right,j)
  done;
  
  (* Get jobs until there are no more jobs. *)
  let cont = ref true in
  while !cont do
    let jobs = Mm_proto_clnt.Controller.V1.pull_jobs controller jobs_at_once in
    cont := (jobs <> [| |]);
    
    let results = ref [] in
    Array.iter
      (fun job ->
	 let lcol = job.left_col in
	 let rrow = job.right_row in
	 let s = ref 0.0 in
	 for j = 0 to ldim.rows-1 do
	   s := !s +. lmatrix.(j).(lcol) *. rmatrix.(rrow).(j)
	 done;
	 results :=
	   { res_job = job;
	     res_val = !s
	   } :: !results
      )
      jobs;
    
    Mm_proto_clnt.Controller.V1.put_results controller (Array.of_list !results)
  done;

  (* Done: return "()" to caller *)
  ()


let configure cf addr =
  let controller_host =
    try cf # string_param(cf # resolve_parameter addr "controller_host")
    with Not_found ->
      failwith "Required param controller_host is missing" in
  let controller_port =
    try cf # int_param(cf # resolve_parameter addr "controller_port")
    with Not_found ->
      failwith "Required param controller_port is missing" in
  (controller_host, controller_port)
  

let setup srv (controller_host, controller_port) =
  Mm_proto_srv.Worker.V1.bind
    ~proc_ping:(fun () -> ())
    ~proc_run:(proc_run (controller_host, controller_port))
    srv

let worker_factory() =
  Rpc_netplex.rpc_factory
    ~configure
    ~name:"mm_worker"
    ~setup
    ()