File: netplex_cenv.ml

package info (click to toggle)
ocamlnet 4.1.2-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 51,764 kB
  • ctags: 16,446
  • sloc: ml: 148,419; ansic: 10,989; sh: 1,885; makefile: 1,355
file content (482 lines) | stat: -rw-r--r-- 11,618 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
(* $Id$ *)

open Netplex_types
open Printf

module Debug = struct
  let enable = ref false
end

let dlog = Netlog.Debug.mk_dlog "Netplex_cenv" Debug.enable
let dlogr = Netlog.Debug.mk_dlogr "Netplex_cenv" Debug.enable

let () =
  Netlog.Debug.register_module "Netplex_cenv" Debug.enable


exception Not_in_container_thread

let obj_of_thread = Hashtbl.create 10
  (* We assume here that obj_of_thread is filled with all parallelizers
     before Netplex starts the first thread. Hence, it is not protected
     by mutexes
   *)

let register_par par =
  if not (Hashtbl.mem obj_of_thread par#ptype) then (
    let (lock, unlock) = par # create_mem_mutex() in
    let m = Hashtbl.create 10 in
    Hashtbl.add obj_of_thread par#ptype (lock, unlock, par, m)
  )
;;


let register_cont (cont : container) thread =
  if thread#ptype <> `Controller_attached then (
    dlogr (fun () ->
	     sprintf "register_cont cont=%d thread=%s"
	       (Oo.id cont) thread#info_string);
    let (lock, unlock, par, m) =
      try Hashtbl.find obj_of_thread thread#ptype
      with Not_found -> 
	failwith "Netplex_cenv.register_cont: Unknown parallelizer type" in
    
    lock();
    Hashtbl.replace m thread#sys_id (`Container cont);
    unlock()
  )
;;


let register_ctrl (ctrl : controller) =
  if ctrl#ptype <> `Controller_attached then (
    let (lock, unlock, par, m) =
      try Hashtbl.find obj_of_thread ctrl#ptype
      with Not_found -> 
	failwith "Netplex_cenv.register_ctrl: Unknown parallelizer type" in
    
    lock();
    Hashtbl.replace m ctrl#sys_id (`Controller ctrl);
    unlock()
  )
;;


let unregister ptype sys_id =
  let (lock, unlock, par, m) =
    try Hashtbl.find obj_of_thread ptype
    with Not_found -> 
      failwith "Netplex_cenv.unregister: Unknown parallelizer type" in
    
  lock();
  Hashtbl.remove m sys_id;
  unlock();
  dlogr (fun () ->
	   sprintf "unregister remaining_objects=%d"
	     (Hashtbl.length m))


let unregister_cont cont thread =
  if thread#ptype <> `Controller_attached then (
    dlogr (fun () ->
	     sprintf "unregister_cont cont=%d thread=%s"
	       (Oo.id cont) thread#info_string);
    unregister thread#ptype thread#sys_id
  )
;;


let unregister_ctrl ctrl =
  if ctrl#ptype <> `Controller_attached then
    unregister ctrl#ptype ctrl#sys_id
;;



exception Found

let self_obj_par() =
  (* We do not know the parallelizer, so simply try them one after the other *)
  let found = ref None in
  try
    Hashtbl.iter
      (fun ptype (lock, unlock, par, m) ->
	 let my_sys_id = par # current_sys_id in
	 lock();
	 try
	   let obj = Hashtbl.find m my_sys_id in
	   unlock();
	   found := Some (obj, par);
	   raise Found
	 with
	   | Not_found ->
	       unlock();
      )
      obj_of_thread;
    raise Not_found
  with
    | Found ->
	match !found with
	  | None -> assert false
	  | Some (obj,par) -> (obj,par)
;;

let self_cont_par() =
  try
    match self_obj_par() with
      | (`Container c, par) -> (c,par)
      | _ -> raise Not_found
  with
    | Not_found -> raise Not_in_container_thread


let self_cont() =
  fst(self_cont_par())

let self_obj() =
  fst(self_obj_par())

let self_par() =
  try
    snd(self_obj_par())
  with
    | Not_found -> raise Not_in_container_thread

let current_sys_id() =
  (self_par()) # current_sys_id


let log level msg =
  let obj,_ = 
    try self_obj_par() 
    with Not_found -> raise Not_in_container_thread in
  match obj with
    | `Container cont -> 
	cont # log level msg
    | `Controller ctrl -> 
	ctrl # logger # log 
	  ~component:"netplex.controller"
	  ~level ~message:msg

let logf level fmt =
  Printf.ksprintf (log level) fmt

let report_connection_string fd detail =
  let fd_name =
    try Netsys.string_of_sockaddr(Unix.getsockname fd)
    with _ -> "*" in
  let fd_peer =
    try Netsys.string_of_sockaddr(Netsys.getpeername fd)
    with _ -> "*" in
  let cid =
    match current_sys_id() with
      | `Process pid -> "pid " ^ string_of_int pid
      | `Thread pid -> "thr " ^ string_of_int pid in
  sprintf "netplex.connection (%s) %s -> %s: %s"
    cid fd_peer fd_name
    (if detail = "" then "(ok)" else detail)


type timer = < f : timer -> bool; tmo : float; cont : container > ;;

let timer_table = Hashtbl.create 50
let timer_mutex = ( !Netsys_oothr.provider ) # create_mutex()


let cancel_timer_int do_clear tobj =
  let cont = self_cont() in
  dlogr (fun () -> sprintf "cancel_timer timer=%d cont=%d"
	   (Oo.id tobj) (Oo.id cont));
  let esys = cont#event_system in
  timer_mutex # lock();
  let g_opt =
    try Some(Hashtbl.find timer_table tobj) with Not_found -> None in
  Hashtbl.remove timer_table tobj;
  timer_mutex # unlock();
  if do_clear then
    match g_opt with
      | None -> ()
      | Some g -> 
	  Unixqueue.clear esys g


let cancel_timer = cancel_timer_int true

let rec restart_timer tobj g =
  let cont = self_cont() in
  dlogr (fun () -> sprintf "restart_timer timer=%d cont=%d"
	   (Oo.id tobj) (Oo.id cont));
  let esys = cont#event_system in
  timer_mutex # lock();
  Hashtbl.add timer_table tobj g;
  timer_mutex # unlock();
  Unixqueue.once esys g tobj#tmo 
    (fun () ->
       cancel_timer_int false tobj;
        dlogr (fun () -> sprintf "callback_timer timer=%d cont=%d"
		 (Oo.id tobj) (Oo.id cont));
       (* We let exceptions fall through to Netplex_container.run *)
       let flag = tobj#f tobj in
       if flag then restart_timer tobj g
    )


let create_timer f tmo =
  let cont = self_cont() in
  let esys = cont#event_system in
  let g = Unixqueue.new_group esys in
  let tobj =
    ( object
	method f = f
	method tmo = tmo
	method cont = cont
      end
    ) in
  dlogr (fun () -> sprintf "create_timer timer=%d cont=%d"
	   (Oo.id tobj) (Oo.id cont));
  restart_timer tobj g;
  tobj
  

let cancel_all_timers() =
  let cont = self_cont() in
  dlogr (fun () -> sprintf "cancel_all_timers cont=%d" (Oo.id cont));
  let esys = cont#event_system in
  timer_mutex # lock();
  let tlist = ref [] in
  Hashtbl.iter
    (fun tobj g ->
       if tobj # cont = cont then (
	 Unixqueue.clear esys g;
	 tlist := tobj :: !tlist
       )
    )
    timer_table;
  List.iter
    (fun tobj ->
       Hashtbl.remove timer_table tobj
    )
    !tlist;
  timer_mutex # unlock()


let timer_id tobj =
  Oo.id tobj


exception Container_variable_not_found of string
exception Container_variable_type_mismatch of string

let get_var name =
  let cont = self_cont() in
  try cont # var name 
  with Not_found -> raise(Container_variable_not_found name)

let int_var name =
  match get_var name with
    | `Int i -> i
    | _ -> raise(Container_variable_type_mismatch name)

let string_var name =
  match get_var name with
    | `String i -> i
    | _ -> raise(Container_variable_type_mismatch name)

let float_var name =
  match get_var name with
    | `Float i -> i
    | _ -> raise(Container_variable_type_mismatch name)

let bool_var name =
  match get_var name with
    | `Bool i -> i
    | _ -> raise(Container_variable_type_mismatch name)

let set_int_var name i =
  let cont = self_cont() in
  cont # set_var name (`Int i)

let set_string_var name i =
  let cont = self_cont() in
  cont # set_var name (`String i)

let set_float_var name i =
  let cont = self_cont() in
  cont # set_var name (`Float i)

let set_bool_var name i =
  let cont = self_cont() in
  cont # set_var name (`Bool i)

let make_var_type wrap unwrap =
  let get name =
    match get_var name with
      | `Encap x ->
	  ( try unwrap x
	    with
	      | Netplex_encap.Type_mismatch -> 
		  raise(Container_variable_type_mismatch name)
	  )
      | _ ->
	  raise(Container_variable_type_mismatch name) in
  let set name x =
    let cont = self_cont() in
    cont # set_var name (`Encap (wrap x)) in
  (get, set)

module type TYPE = sig type t end

module type VAR_TYPE = sig
  type t 
  val get : string -> t
  val set : string -> t -> unit
end

module Make_var_type(T:TYPE) = struct
  type t = T.t
  module E = Netplex_encap.Make_encap(T)
  let (get, set) = make_var_type E.wrap E.unwrap
end


let admin_connector() =
  let cont = self_cont() in
  match cont#lookup "netplex.controller" "admin" with
    | None ->
	failwith "Netplex_cenv.admin_connector: Socket not found"
    | Some path ->
	let c = Netplex_util.any_file_client_connector path in
	`Socket(Rpc.Tcp,
		c,
		Rpc_client.default_socket_config)

let admin_call f =
  let conn = admin_connector() in
  let client = Netplex_ctrl_clnt.Admin.V2.create_client2 conn in
  try
    f client ();
    Rpc_client.shut_down client
  with
    | err ->
	Rpc_client.shut_down client;
	raise err

let system_shutdown() =
  match self_obj() with
    | `Container _ ->
	admin_call Netplex_ctrl_clnt.Admin.V2.system_shutdown
    | `Controller ctrl ->
	ctrl # shutdown()

let system_restart() =
  match self_obj() with
    | `Container _ ->
	admin_call Netplex_ctrl_clnt.Admin.V2.restart_all
    | `Controller ctrl ->
	ctrl # restart()

let send_message pat msg args =
  match self_obj() with
    | `Container cont ->
	cont # send_message pat msg args
    | `Controller ctrl ->
	ctrl # send_message pat msg args

let lookup sname pname =
  let cont = self_cont() in
  cont # lookup sname pname

let lookup_container_sockets sname pname =
  let cont = self_cont() in
  cont # lookup_container_sockets sname pname

let pmanage() =
  let obj,_ = 
    try self_obj_par() 
    with Not_found -> raise Not_in_container_thread in
  let sockdir =
    match obj with
      | `Container cont -> 
          cont # socket_service # socket_service_config # controller_config 
            # socket_directory
    | `Controller ctrl -> 
        ctrl # controller_config # socket_directory in
  Netsys_pmanage.pmanage (Filename.concat sockdir "netplex.pmanage")

let run_in_esys esys f =
  let mutex = !Netsys_oothr.provider # create_mutex() in
  let cond = !Netsys_oothr.provider # create_condition() in
  let g = Unixqueue.new_group esys in
  let r = ref (fun () -> assert false) in
  Unixqueue.once esys g 0.0
    (fun () ->
       ( try
	   f();
	   mutex # lock();
	   r := (fun () -> ());
	   mutex # unlock();
	 with
	   | e -> 
	       mutex # lock();
	       r := (fun () -> raise e);
	       mutex # unlock();
       );
       cond # signal()
    );
  mutex # lock();
  cond # wait mutex;
  mutex # unlock();
  !r()
  

let run_in_controller_context ctrl f =
  if ctrl#ptype <> `Multi_threading then
    failwith "Netplex_cenv.run_in_controller_context: only possible for multi-threaded environments";
  let esys = ctrl # event_system in
  run_in_esys esys f


let run_in_container_context cont f =
  if cont#ptype <> `Multi_threading then
    failwith "Netplex_cenv.run_in_container_context: only possible for multi-threaded environments";
  let esys = cont # event_system in
  run_in_esys esys f


module type FUN_TYPE = 
  sig 
    type s  (** argument type *)
    type r  (** result type *)
  end

module type LEVER = sig
  type s  (** argument type *)
  type r  (** result type *)
  type t = s->r

  val register : Netplex_types.controller -> 
                 (Netplex_types.controller -> t) -> t
end

module Make_lever(T:FUN_TYPE) = struct
  type s = T.s
  type r = T.r
  type t = s->r

  module ES = Netplex_encap.Make_encap(struct type t = s end)
  module ER = Netplex_encap.Make_encap(struct type t = r end)

  let register ctrl raw_lever =
    let id =
      ctrl # register_lever
	(fun ctrl enc_arg ->
	   let arg = ES.unwrap enc_arg in
	   let res = raw_lever ctrl arg in
	   ER.wrap res
	) in
    (fun arg ->
       let cont = self_cont() in
       let res_enc = cont # activate_lever id (ES.wrap arg) in
       ER.unwrap res_enc
    )
end