1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
|
(* $Id$ *)
open Netplex_types
open Printf
module Debug = struct
let enable = ref false
end
let dlog = Netlog.Debug.mk_dlog "Netplex_cenv" Debug.enable
let dlogr = Netlog.Debug.mk_dlogr "Netplex_cenv" Debug.enable
let () =
Netlog.Debug.register_module "Netplex_cenv" Debug.enable
exception Not_in_container_thread
let obj_of_thread = Hashtbl.create 10
(* We assume here that obj_of_thread is filled with all parallelizers
before Netplex starts the first thread. Hence, it is not protected
by mutexes
*)
let register_par par =
if not (Hashtbl.mem obj_of_thread par#ptype) then (
let (lock, unlock) = par # create_mem_mutex() in
let m = Hashtbl.create 10 in
Hashtbl.add obj_of_thread par#ptype (lock, unlock, par, m)
)
;;
let register_cont (cont : container) thread =
if thread#ptype <> `Controller_attached then (
dlogr (fun () ->
sprintf "register_cont cont=%d thread=%s"
(Oo.id cont) thread#info_string);
let (lock, unlock, par, m) =
try Hashtbl.find obj_of_thread thread#ptype
with Not_found ->
failwith "Netplex_cenv.register_cont: Unknown parallelizer type" in
lock();
Hashtbl.replace m thread#sys_id (`Container cont);
unlock()
)
;;
let register_ctrl (ctrl : controller) =
if ctrl#ptype <> `Controller_attached then (
let (lock, unlock, par, m) =
try Hashtbl.find obj_of_thread ctrl#ptype
with Not_found ->
failwith "Netplex_cenv.register_ctrl: Unknown parallelizer type" in
lock();
Hashtbl.replace m ctrl#sys_id (`Controller ctrl);
unlock()
)
;;
let unregister ptype sys_id =
let (lock, unlock, par, m) =
try Hashtbl.find obj_of_thread ptype
with Not_found ->
failwith "Netplex_cenv.unregister: Unknown parallelizer type" in
lock();
Hashtbl.remove m sys_id;
unlock();
dlogr (fun () ->
sprintf "unregister remaining_objects=%d"
(Hashtbl.length m))
let unregister_cont cont thread =
if thread#ptype <> `Controller_attached then (
dlogr (fun () ->
sprintf "unregister_cont cont=%d thread=%s"
(Oo.id cont) thread#info_string);
unregister thread#ptype thread#sys_id
)
;;
let unregister_ctrl ctrl =
if ctrl#ptype <> `Controller_attached then
unregister ctrl#ptype ctrl#sys_id
;;
exception Found
let self_obj_par() =
(* We do not know the parallelizer, so simply try them one after the other *)
let found = ref None in
try
Hashtbl.iter
(fun ptype (lock, unlock, par, m) ->
let my_sys_id = par # current_sys_id in
lock();
try
let obj = Hashtbl.find m my_sys_id in
unlock();
found := Some (obj, par);
raise Found
with
| Not_found ->
unlock();
)
obj_of_thread;
raise Not_found
with
| Found ->
match !found with
| None -> assert false
| Some (obj,par) -> (obj,par)
;;
let self_cont_par() =
try
match self_obj_par() with
| (`Container c, par) -> (c,par)
| _ -> raise Not_found
with
| Not_found -> raise Not_in_container_thread
let self_cont() =
fst(self_cont_par())
let self_obj() =
fst(self_obj_par())
let self_par() =
try
snd(self_obj_par())
with
| Not_found -> raise Not_in_container_thread
let current_sys_id() =
(self_par()) # current_sys_id
let log level msg =
let obj,_ =
try self_obj_par()
with Not_found -> raise Not_in_container_thread in
match obj with
| `Container cont ->
cont # log level msg
| `Controller ctrl ->
ctrl # logger # log
~component:"netplex.controller"
~level ~message:msg
let logf level fmt =
Printf.ksprintf (log level) fmt
let report_connection_string fd detail =
let fd_name =
try Netsys.string_of_sockaddr(Unix.getsockname fd)
with _ -> "*" in
let fd_peer =
try Netsys.string_of_sockaddr(Netsys.getpeername fd)
with _ -> "*" in
let cid =
match current_sys_id() with
| `Process pid -> "pid " ^ string_of_int pid
| `Thread pid -> "thr " ^ string_of_int pid in
sprintf "netplex.connection (%s) %s -> %s: %s"
cid fd_peer fd_name
(if detail = "" then "(ok)" else detail)
type timer = < f : timer -> bool; tmo : float; cont : container > ;;
let timer_table = Hashtbl.create 50
let timer_mutex = ( !Netsys_oothr.provider ) # create_mutex()
let cancel_timer_int do_clear tobj =
let cont = self_cont() in
dlogr (fun () -> sprintf "cancel_timer timer=%d cont=%d"
(Oo.id tobj) (Oo.id cont));
let esys = cont#event_system in
timer_mutex # lock();
let g_opt =
try Some(Hashtbl.find timer_table tobj) with Not_found -> None in
Hashtbl.remove timer_table tobj;
timer_mutex # unlock();
if do_clear then
match g_opt with
| None -> ()
| Some g ->
Unixqueue.clear esys g
let cancel_timer = cancel_timer_int true
let rec restart_timer tobj g =
let cont = self_cont() in
dlogr (fun () -> sprintf "restart_timer timer=%d cont=%d"
(Oo.id tobj) (Oo.id cont));
let esys = cont#event_system in
timer_mutex # lock();
Hashtbl.add timer_table tobj g;
timer_mutex # unlock();
Unixqueue.once esys g tobj#tmo
(fun () ->
cancel_timer_int false tobj;
dlogr (fun () -> sprintf "callback_timer timer=%d cont=%d"
(Oo.id tobj) (Oo.id cont));
(* We let exceptions fall through to Netplex_container.run *)
let flag = tobj#f tobj in
if flag then restart_timer tobj g
)
let create_timer f tmo =
let cont = self_cont() in
let esys = cont#event_system in
let g = Unixqueue.new_group esys in
let tobj =
( object
method f = f
method tmo = tmo
method cont = cont
end
) in
dlogr (fun () -> sprintf "create_timer timer=%d cont=%d"
(Oo.id tobj) (Oo.id cont));
restart_timer tobj g;
tobj
let cancel_all_timers() =
let cont = self_cont() in
dlogr (fun () -> sprintf "cancel_all_timers cont=%d" (Oo.id cont));
let esys = cont#event_system in
timer_mutex # lock();
let tlist = ref [] in
Hashtbl.iter
(fun tobj g ->
if tobj # cont = cont then (
Unixqueue.clear esys g;
tlist := tobj :: !tlist
)
)
timer_table;
List.iter
(fun tobj ->
Hashtbl.remove timer_table tobj
)
!tlist;
timer_mutex # unlock()
let timer_id tobj =
Oo.id tobj
exception Container_variable_not_found of string
exception Container_variable_type_mismatch of string
let get_var name =
let cont = self_cont() in
try cont # var name
with Not_found -> raise(Container_variable_not_found name)
let int_var name =
match get_var name with
| `Int i -> i
| _ -> raise(Container_variable_type_mismatch name)
let string_var name =
match get_var name with
| `String i -> i
| _ -> raise(Container_variable_type_mismatch name)
let float_var name =
match get_var name with
| `Float i -> i
| _ -> raise(Container_variable_type_mismatch name)
let bool_var name =
match get_var name with
| `Bool i -> i
| _ -> raise(Container_variable_type_mismatch name)
let set_int_var name i =
let cont = self_cont() in
cont # set_var name (`Int i)
let set_string_var name i =
let cont = self_cont() in
cont # set_var name (`String i)
let set_float_var name i =
let cont = self_cont() in
cont # set_var name (`Float i)
let set_bool_var name i =
let cont = self_cont() in
cont # set_var name (`Bool i)
let make_var_type wrap unwrap =
let get name =
match get_var name with
| `Encap x ->
( try unwrap x
with
| Netplex_encap.Type_mismatch ->
raise(Container_variable_type_mismatch name)
)
| _ ->
raise(Container_variable_type_mismatch name) in
let set name x =
let cont = self_cont() in
cont # set_var name (`Encap (wrap x)) in
(get, set)
module type TYPE = sig type t end
module type VAR_TYPE = sig
type t
val get : string -> t
val set : string -> t -> unit
end
module Make_var_type(T:TYPE) = struct
type t = T.t
module E = Netplex_encap.Make_encap(T)
let (get, set) = make_var_type E.wrap E.unwrap
end
let admin_connector() =
let cont = self_cont() in
match cont#lookup "netplex.controller" "admin" with
| None ->
failwith "Netplex_cenv.admin_connector: Socket not found"
| Some path ->
let c = Netplex_util.any_file_client_connector path in
`Socket(Rpc.Tcp,
c,
Rpc_client.default_socket_config)
let admin_call f =
let conn = admin_connector() in
let client = Netplex_ctrl_clnt.Admin.V2.create_client2 conn in
try
f client ();
Rpc_client.shut_down client
with
| err ->
Rpc_client.shut_down client;
raise err
let system_shutdown() =
match self_obj() with
| `Container _ ->
admin_call Netplex_ctrl_clnt.Admin.V2.system_shutdown
| `Controller ctrl ->
ctrl # shutdown()
let system_restart() =
match self_obj() with
| `Container _ ->
admin_call Netplex_ctrl_clnt.Admin.V2.restart_all
| `Controller ctrl ->
ctrl # restart()
let send_message pat msg args =
match self_obj() with
| `Container cont ->
cont # send_message pat msg args
| `Controller ctrl ->
ctrl # send_message pat msg args
let lookup sname pname =
let cont = self_cont() in
cont # lookup sname pname
let lookup_container_sockets sname pname =
let cont = self_cont() in
cont # lookup_container_sockets sname pname
let pmanage() =
let obj,_ =
try self_obj_par()
with Not_found -> raise Not_in_container_thread in
let sockdir =
match obj with
| `Container cont ->
cont # socket_service # socket_service_config # controller_config
# socket_directory
| `Controller ctrl ->
ctrl # controller_config # socket_directory in
Netsys_pmanage.pmanage (Filename.concat sockdir "netplex.pmanage")
let run_in_esys esys f =
let mutex = !Netsys_oothr.provider # create_mutex() in
let cond = !Netsys_oothr.provider # create_condition() in
let g = Unixqueue.new_group esys in
let r = ref (fun () -> assert false) in
Unixqueue.once esys g 0.0
(fun () ->
( try
f();
mutex # lock();
r := (fun () -> ());
mutex # unlock();
with
| e ->
mutex # lock();
r := (fun () -> raise e);
mutex # unlock();
);
cond # signal()
);
mutex # lock();
cond # wait mutex;
mutex # unlock();
!r()
let run_in_controller_context ctrl f =
if ctrl#ptype <> `Multi_threading then
failwith "Netplex_cenv.run_in_controller_context: only possible for multi-threaded environments";
let esys = ctrl # event_system in
run_in_esys esys f
let run_in_container_context cont f =
if cont#ptype <> `Multi_threading then
failwith "Netplex_cenv.run_in_container_context: only possible for multi-threaded environments";
let esys = cont # event_system in
run_in_esys esys f
module type FUN_TYPE =
sig
type s (** argument type *)
type r (** result type *)
end
module type LEVER = sig
type s (** argument type *)
type r (** result type *)
type t = s->r
val register : Netplex_types.controller ->
(Netplex_types.controller -> t) -> t
end
module Make_lever(T:FUN_TYPE) = struct
type s = T.s
type r = T.r
type t = s->r
module ES = Netplex_encap.Make_encap(struct type t = s end)
module ER = Netplex_encap.Make_encap(struct type t = r end)
let register ctrl raw_lever =
let id =
ctrl # register_lever
(fun ctrl enc_arg ->
let arg = ES.unwrap enc_arg in
let res = raw_lever ctrl arg in
ER.wrap res
) in
(fun arg ->
let cont = self_cont() in
let res_enc = cont # activate_lever id (ES.wrap arg) in
ER.unwrap res_enc
)
end
|