1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
(* $Id$ *)
(* This example demonstrates how to start a helper container (i.e. an
additional thread or process). The helper container does not create
a socket, so we must use internal IPC methods.
This module is the server program. Use test_client as client.
The [operation] just returns the passed string in this example (identity).
As side effect, the number of invocations are counted. The task of the
helper container is to print this number once each minute to stdout.
Here, the helper container is socket-less. This is for the purpose
of this example - in a more real-world scenario it is not uncommon
that helpers also have sockets, and define RPCs one can call. Note
that internal sockets (socketpairs) are always created, but these
cannot be accessed from outside.
As every container needs a service definition, the question is how to
provide it. Here, we simply add it to the controller at program startup,
i.e. it does not appear in the configuration file. Note that the name
of the service is nevertheless a global name, and one should be careful
not to assign the same name twice.
So we have two services:
- [worker]: Defined in the configuration file. The containers of this
service execute [operation], and count the number of invocations.
- [helper]: Defined programmatically. There is only one container,
and this container prints the number once every minute.
For counting, we simply use counters provided by
[Netplex_semaphore].
This example is very interesting for multi-threaded programs, because
it shows how to create threads inside the Netplex environment. There
is a lot of formalism involved, but the advantage is that the threads
can use Netplex functions:
- They can print log messages using [Netplex_cenv.log]
- They can use Netplex plugins like [Netplex_semaphore]
- They can be managed by netplex_admin (shutdown, restart, list of
active containers)
*)
open Printf
(* --- worker --- *)
let proc_operation s =
ignore(Netplex_semaphore.increment "worker.operation_count");
s
let setup srv _ =
Operation_srv.P.V.bind
~proc_null:(fun () -> ())
~proc_operation
srv
(* --- helper --- *)
let set_up_timer() =
ignore
(Netplex_cenv.create_timer
(fun _ ->
let v = Netplex_semaphore.get "worker.operation_count" in
Netlog.logf `Info "operation count: %Ld" v;
true
)
60.0)
(* --- main --- *)
let start() =
let (opt_list, cmdline_cfg) = Netplex_main.args() in
let use_mt = ref false in
let debug = ref false in
let opt_list' =
[ "-mt", Arg.Set use_mt,
" Use multi-threading instead of multi-processing";
"-debug", Arg.Set debug,
" Enable (some) debug messages";
] @ opt_list in
Arg.parse
opt_list'
(fun s -> raise (Arg.Bad ("Don't know what to do with: " ^ s)))
(sprintf "usage: %s [options]" (Filename.basename Sys.argv.(0)));
if !debug then (
()
);
let parallelizer =
if !use_mt then
Netplex_mt.mt() (* multi-threading *)
else
Netplex_mp.mp() in (* multi-processing *)
let worker_hooks =
( object(self)
inherit Netplex_kit.empty_processor_hooks()
method post_add_hook _ ctrl =
ctrl # add_plugin Netplex_semaphore.plugin
method post_start_hook _ =
let _cr_flag =
Netplex_semaphore.create
~protected:false
"worker.operation_count"
0L in
()
end
) in
let worker_factory =
Rpc_netplex.rpc_factory
~configure:(fun _ _ -> ())
~name:"worker"
~setup
~hooks:(fun _ -> worker_hooks)
() in
let helper_hooks =
( object
inherit Netplex_kit.empty_processor_hooks ()
method post_add_hook _ ctrl =
ctrl # add_plugin Netplex_semaphore.plugin
method post_start_hook _ =
set_up_timer()
end
) in
Netplex_main.startup
~late_initializer:(
fun _ ctrl ->
(* The helper isn't added via a factory, but directly to the
controller using this special function:
*)
Netplex_kit.add_helper_service ctrl "helper" helper_hooks
)
parallelizer
Netplex_log.logger_factories (* allow all built-in logging styles *)
Netplex_workload.workload_manager_factories (* ... all ways of workload management *)
[ worker_factory ] (* make this service type available *)
cmdline_cfg
let () =
Netsys_signal.init();
start()
|