File: netplex_main.ml

package info (click to toggle)
ocamlnet 4.1.2-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 51,764 kB
  • ctags: 16,446
  • sloc: ml: 148,419; ansic: 10,989; sh: 1,885; makefile: 1,355
file content (402 lines) | stat: -rw-r--r-- 11,645 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
(* $Id$ *)

open Netplex_types
open Printf

type cmdline_config =
    { mutable config_filename_opt : string option;
      mutable config_tree_opt : config_tree option;
      mutable pidfile : string option;
      mutable foreground : bool;
    }

let is_win32 =
  match Sys.os_type with
    | "Win32" -> true
    | _ -> false;;

let create ?config_filename
           ?config_tree
           ?(pidfile = None)
           ?(foreground = false) () =
  { config_filename_opt = config_filename;
    config_tree_opt = config_tree;
    pidfile = pidfile;
    foreground = foreground
  }

let modify ?config_filename
           ?config_tree
           ?pidfile
           ?foreground cfg =
  { config_filename_opt = ( match config_filename with
			      | Some f -> Some f
			      | None -> cfg.config_filename_opt
			  );
    config_tree_opt = ( match config_tree with
			  | Some f -> Some f
			  | None -> cfg.config_tree_opt
		      );
    pidfile = ( match pidfile with
		  | Some popt -> popt
		  | None -> cfg.pidfile
	      );
    foreground = ( match foreground with
		     | Some fg -> fg
		     | None -> cfg.foreground
		 )
  }


let args ?(defaults = create()) () =
  let config =
    (* copy of defaults: *)
    modify defaults in

  let spec =
    [ "-conf",
      (Arg.String (fun s -> config.config_filename_opt <- Some s)),
      "<file>  Read this configuration file";
      
      "-pid",
      (Arg.String (fun s -> config.pidfile <- Some s)),
      "<file>  Write this PID file";
      
      "-fg",
      (Arg.Unit (fun () -> config.foreground <- true)),
      "  Start in the foreground and do not run as daemon";
    ] in
  (spec, config)
;;


let config_filename cf = 
  match cf.config_filename_opt with
    | Some f -> f
    | None ->
	let command_name = Sys.argv.(0) in
	( try
	    (Filename.chop_extension command_name) ^ ".conf"
	  with
	    | _ -> command_name ^ ".conf"
	)

let config_filename_opt cf = cf.config_filename_opt

let config_tree_opt cf = cf.config_tree_opt

let pidfile cf = cf.pidfile

let foreground cf = cf.foreground

let daemon f =
  (* Double fork to avoid becoming a pg leader. The outer process waits
     until the most important initializations of the child are done
     (e.g. master sockets are created).
   *)
  if is_win32 then
    failwith "Startup as daemon is unsupported on Win32 - use -fg switch";
  let oldmask = Unix.sigprocmask Unix.SIG_BLOCK [Sys.sigusr1; Sys.sigchld] in
  let pid = Unix.getpid() in
  match Unix.fork() with
    | 0 ->
        ( match Unix.fork() with
            | 0 ->
                ignore(Unix.sigprocmask Unix.SIG_SETMASK oldmask);
                let _ = Unix.setsid() in (* Start new session/get rid of tty *)
                (* Assign stdin/stdout to /dev/null *)
                Unix.close Unix.stdin;
                ignore(Unix.openfile "/dev/null" [ Unix.O_RDONLY ] 0);
                Unix.close Unix.stdout;
                ignore(Unix.openfile "/dev/null" [ Unix.O_WRONLY ] 0);
                (* Keep stderr open: error messages should appear *)
		Netsys_posix.run_post_fork_handlers();
                f ~init_done:(fun () -> Unix.kill pid Sys.sigusr1)
            | _ ->
                Netsys._exit 0
        )
    | middle_pid ->
        (* Wait for zombie: *)
        ignore(Netsys.restart (Unix.waitpid []) middle_pid);
        (* Wait for SIGUSR1, but ignore SIGCHLD *)
        Sys.set_signal Sys.sigusr1 (Sys.Signal_handle (fun _ -> ()));
        Unix.sigsuspend [ Sys.sigchld ];
        ignore(Unix.sigprocmask Unix.SIG_SETMASK oldmask);
;;


let rec run_controller ctrl =
  try
    Unixqueue.run ctrl#event_system
  with
    | error ->
        let bt = Printexc.get_backtrace() in
	ctrl # logger # log
	  ~component:"netplex.controller"
	  ~level:`Crit
	  ~message:("Uncaught exception: " ^ Netexn.to_string error);
        if bt <> "" then
	  ctrl # logger # log
	    ~component:"netplex.controller"
	    ~level:`Crit
	    ~message:("Backtrace: " ^ bt);
	run_controller ctrl
;;


let get_config_pair ~config_parser par c_logger_cf c_wrkmg_cf c_proc_cf cf =
  let config_file = 
    match cf.config_tree_opt with
      | None ->
	  config_parser (config_filename cf)
      | Some tree ->
	  Netplex_config.repr_config_file (config_filename cf) tree in
  let netplex_config =
    Netplex_config.read_netplex_config
      par#ptype
      c_logger_cf c_wrkmg_cf c_proc_cf 
      config_file in
  (config_file, netplex_config)


let handle_pidfile cf =
  match cf.pidfile with
    | Some file ->
         let f = open_out file in
         fprintf f "%d\n" (Unix.getpid());
         close_out f;
         (fun () ->
	  try Sys.remove file with _ -> ())
    | None ->
         (fun () -> ())

let redirect_logger f =
  let old_logger = !Netlog.current_logger in
  let old_dlogger = !Netlog.Debug.current_dlogger in

  Netlog.current_logger := 
    (fun level message ->
       try
         Netplex_cenv.log level message
       (* This function also works from the controller thread! *)
       with
         | Netplex_cenv.Not_in_container_thread ->
	      (* Fall back to something safe: *)
	      old_logger level message
    );
  (* hmmm, Netlog.Debug cannot be handled by netplex *)
  Netlog.Debug.current_dlogger := 
    (fun mname msg ->
       Netlog.channel_logger stderr `Debug `Debug (mname ^ ": " ^ msg)
    );
  try
    let r = f() in
    Netlog.current_logger := old_logger;
    Netlog.Debug.current_dlogger := old_dlogger;
    r
  with
    | error ->
         Netlog.current_logger := old_logger;
         Netlog.Debug.current_dlogger := old_dlogger;
         raise error


let setup_controller config_file netplex_config controller_config controller =
  let processors =
    List.map
      (fun (sockserv_cfg, 
	    (procaddr, c_proc_cfg), 
	    (wrkmngaddr, c_wrkmng_cfg)
	   ) ->
         c_proc_cfg # create_processor
		        controller_config config_file procaddr
      )
      netplex_config#services in
  (* An exception while creating the processors will prevent the
   * startup of the whole system!
   *)
              
  let services =
    List.map2
      (fun (sockserv_cfg, 
	    (procaddr, c_proc_cfg), 
	    (wrkmngaddr, c_wrkmng_cfg)
	   ) 
  	   processor ->
       try
	 let wrkmng =
	   c_wrkmng_cfg # create_workload_manager
		            controller_config config_file wrkmngaddr in
	 let sockserv = 
	   Netplex_sockserv.create_socket_service 
	     processor sockserv_cfg in
	 Some (sockserv, wrkmng)
       with
	 | error ->
	      (* An error while creating the sockets is quite
               * problematic. We do not add the service, but we cannot
               * prevent the system startup at that late point in time
               *)
	      controller # logger # log
                ~component:"netplex.controller"
		~level:`Crit
		~message:("Uncaught exception preparing service " ^ 
			    sockserv_cfg#name ^ ": " ^ 
			      Netexn.to_string error);
	      None
      )
      netplex_config#services
      processors in

  List.iter
    (function
      | Some(sockserv,wrkmng) ->
	   ( try
	       controller # add_service sockserv wrkmng
	     with
	       | error ->
		    (* An error is very problematic now... *)
		    controller # logger # log
	              ~component:"netplex.controller"
		      ~level:`Crit
		      ~message:("Uncaught exception adding service " ^ 
				  sockserv#name ^ ": " ^ 
				    Netexn.to_string error);
	   )
      | None ->
	   ()
    )
    services


let startup ?(late_initializer = fun _ _ -> ())
            ?(config_parser = Netplex_config.read_config_file)
            par c_logger_cf c_wrkmg_cf c_proc_cf cf =
  let (config_file, netplex_config) =
    get_config_pair
      ~config_parser par c_logger_cf c_wrkmg_cf c_proc_cf cf in
  let maybe_daemonize =
    (if cf.foreground then
       (fun f -> f ~init_done:(fun () -> ()))
     else
       daemon) in
  maybe_daemonize
    (fun ~init_done ->
       let remove_pid_file = handle_pidfile cf in
       try
	 let controller_config = netplex_config # controller_config in
	 
	 let controller = 
	   Netplex_controller.create_controller 
	     par controller_config in
	 Netplex_cenv.register_ctrl controller;

         let prop = Netplex_sharedvar.global_propagator() in
         Netsys_global.set_propagator (Some prop);

	 (* Change to / so we don't block filesystems without need.
            Do this after controller creation so the controller has a
            chance to remember the cwd
	  *)
	 Unix.chdir "/";  (* FIXME Win32: Something like c:/ *)

         redirect_logger
           (fun () ->
              setup_controller
                config_file netplex_config controller_config controller;
	      ( try
	          late_initializer config_file controller
	        with
	          | error ->
		       (* An error is ... *)
                      let bt = Printexc.get_backtrace() in
		      controller # logger # log
                        ~component:"netplex.controller"
		        ~level:`Crit
		        ~message:("Uncaught exception in late initialization: " ^ 
			            Netexn.to_string error);
                      if bt <> "" then
		        controller # logger # log
                          ~component:"netplex.controller"
		          ~level:`Crit
		          ~message:("Backtrace: " ^ bt);
	      );

	      init_done();

	      run_controller controller;
	      controller # free_resources();
	      Netplex_cenv.unregister_ctrl controller;
              Netsys_global.set_propagator None;
              Netplex_sharedvar.propagate_back controller;
              remove_pid_file();
           )
       with
	 | error ->
             remove_pid_file();
             raise error
    )
;;

let run ?(config_parser = Netplex_config.read_config_file)
        ~late_initializer ~extract_result
         par c_logger_cf c_wrkmg_cf c_proc_cf cf =
  let (config_file, netplex_config) =
    get_config_pair
      ~config_parser par c_logger_cf c_wrkmg_cf c_proc_cf cf in

  let remove_pid_file = handle_pidfile cf in
  let cleanup = ref [ remove_pid_file ] in
  try
    let controller_config = netplex_config # controller_config in
	 
    let controller = 
      Netplex_controller.create_controller 
	par controller_config in
    Netplex_cenv.register_ctrl controller;
    cleanup := (fun () -> Netplex_cenv.unregister_ctrl controller) :: !cleanup;

    redirect_logger
      (fun () ->
         setup_controller
           config_file netplex_config controller_config controller;
         cleanup := controller#free_resources :: !cleanup;
	 let late_value =
           try
	     late_initializer config_file controller
	   with
	     | error ->
		 (* An error is ... *)
                 let bt = Printexc.get_backtrace() in
		 controller # logger # log
                   ~component:"netplex.controller"
		   ~level:`Crit
		   ~message:("Uncaught exception in late initialization: " ^ 
			       Netexn.to_string error);
                 if bt <> "" then
		   controller # logger # log
                     ~component:"netplex.controller"
		     ~level:`Crit
		     ~message:("Backtrace: " ^ bt);
                 raise error in
	 run_controller controller;
         let result = extract_result controller late_value in
	 controller # free_resources();
	 Netplex_cenv.unregister_ctrl controller;
         remove_pid_file();
         result
      )
  with
    | error ->
         List.iter
           (fun f ->
              try
                f()
              with
                | e ->
                     eprintf "Exception in cleanup after exception: %s\n%!"
                             (Netexn.to_string e)
           )
           !cleanup;
         raise error
;;