1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
|
(* $Id$ *)
(* This example demonstrates some very basic features of Netplex:
- Starting an "empty service", i.e. one that does not provide
any network functionality. Nevertheless a service process is
forked, and all the hook functions are executed.
- Echo service: Opens a network port, and echos all lines
sent to it
- Simple RPC server: Implements the RPC "operation" from operation.x
All three services are started from a single program.
Note that there is a fundamental difference between "echo" and "operation":
As echo is written in synchronous (blocking) style, and we have only
one process, there can be at most one TCP connection to this service.
In contrast to this, "operation" can serve many connections in parallel.
This is a functionality of the RPC layer.
Test helloworld:
$ ./helloworld -fg -conf helloworld.cfg
Then connect to "echo":
$ netcat localhost 4343
Connect to "operation":
$ ./test_client -port 4444 foo
Use netplex-admin for administration:
$ ../../src/netplex/netplex-admin -list
operation: Enabled 1 containers
rpc/operation @ inet:0.0.0.0:4444
echo: Enabled 1 containers
echo_proto @ inet:0.0.0.0:4343
empty: Enabled 1 containers
dummy @ -
netplex.controller: Enabled 1 containers
admin @ local:/tmp/.netplex/netplex.controller/admin
$ ../../src/netplex/netplex-admin -containers
operation: Enabled 1 containers
rpc/operation @ inet:0.0.0.0:4444
Process 29390: selected
echo: Enabled 1 containers
echo_proto @ inet:0.0.0.0:4343
Process 29389: selected
empty: Enabled 1 containers
dummy @ -
Process 29388: selected
netplex.controller: Enabled 1 containers
admin @ local:/tmp/.netplex/netplex.controller/admin
AttachedToCtrlProcess 29387: selected
$ ../../src/netplex/netplex-admin -restart empty
(and watch the log messages from "empty")
$ ../../src/netplex/netplex-admin foo bar baz
(and watch the log messages from all services)
$ ../../src/netplex/netplex-admin -shutdown
(and watch the log messages from all services)
*)
open Printf
(**********************************************************************)
(* hello_hooks:
Define the processor hooks so that a message is logged
for each. Normally, one inherits from
Netplex_kit.empty_processor_hooks and defines only the
hooks that are needed.
*)
class hello_hooks service_name : Netplex_types.processor_hooks =
object(self)
inherit Netplex_kit.empty_processor_hooks()
method post_add_hook _ _ =
Netlog.logf `Info "%s: post_add_hook" service_name
method post_rm_hook _ _ =
Netlog.logf `Info "%s: post_rm_hook" service_name
method pre_start_hook _ _ _ =
Netlog.logf `Info "%s: pre_start_hook" service_name
method post_start_hook _ =
Netlog.logf `Info "%s: post_start_hook" service_name
method pre_finish_hook _ =
Netlog.logf `Info "%s: pre_finish_hook" service_name
method post_finish_hook _ _ _ =
Netlog.logf `Info "%s: post_finish_hook" service_name
method receive_message _ msg args =
Netlog.logf `Info "%s: receive_message(\"%s\", [%s])"
service_name
(String.escaped msg)
(String.concat ","
(List.map
(fun arg -> "\"" ^ String.escaped arg ^ "\"")
(Array.to_list args)))
method receive_admin_message _ msg args =
Netlog.logf `Info "%s: receive_admin_message(\"%s\", [%s])"
service_name
(String.escaped msg)
(String.concat ","
(List.map
(fun arg -> "\"" ^ String.escaped arg ^ "\"")
(Array.to_list args)))
method system_shutdown () =
Netlog.logf `Info "%s: system_shutdown" service_name
method shutdown() =
Netlog.logf `Info "%s: shutdown" service_name
method global_exception_handler e =
Netlog.logf `Info "%s: global_exception_handler(%s)"
service_name
(Netexn.to_string e);
true
end
(**********************************************************************)
(* Empty service *)
(**********************************************************************)
let empty_service_factory() : Netplex_types.processor_factory =
( object
method name = "empty_service"
method create_processor ctrl_cfg cf addr =
( object (self)
inherit hello_hooks "empty_service"
method supported_ptypes = [ `Multi_processing; `Multi_threading ]
(* We don't expect that process is called. *)
method process ~when_done cont fd proto =
Netlog.logf `Info "empty_service: process(%s)" proto;
Unix.close fd;
when_done()
end
)
end
)
(**********************************************************************)
(* Echo service *)
(**********************************************************************)
let echo_service_factory() : Netplex_types.processor_factory =
( object
method name = "echo_service"
method create_processor ctrl_cfg cf addr =
( object (self)
inherit hello_hooks "echo_service"
method supported_ptypes = [ `Multi_processing; `Multi_threading ]
method process ~when_done cont fd proto =
Netlog.logf `Info "echo_service: process(%s)" proto;
(* fd is non-blocking, but we want it again blocking: *)
Unix.clear_nonblock fd;
(* We have to call when_done under all circumstances, so
catch exceptions here
*)
try
(* We cannot use here in_channel/out_channel as we have
a bidirectional connection. Netchannels has something
for us:
*)
let rch = new Netchannels.socket_descr fd in
(* On top of this, create buffered channels *)
let ich =
Netchannels.lift_in
(`Raw (rch :> Netchannels.raw_in_channel)) in
let och =
Netchannels.lift_out
(`Raw (rch :> Netchannels.raw_out_channel)) in
( try
while true do
(* Read a line from ich, echo to och: *)
let line = ich # input_line() in
och # output_string line;
och # output_char '\n';
och # flush();
done;
assert false (* don't reach this point *)
with
| End_of_file ->
(* We finally get End_of_file from input_line *)
ich # close_in();
och # close_out()
| error ->
ich # close_in();
och # close_out();
raise error
);
(* Done with it: *)
when_done()
with
| error ->
(* We have to ensure that when_done is always called,
even on error.
*)
Netlog.logf `Err
"Exception while echoing: %s" (Netexn.to_string error);
when_done();
(* We could raise the exception here again... *)
end
)
end
)
(**********************************************************************)
(* "Operation" *)
(**********************************************************************)
let operation_service_factory() : Netplex_types.processor_factory =
let proc_operation s =
let l = String.length s in
let u = String.create l in
for k = 0 to l-1 do
u.[k] <- s.[l-1-k]
done;
u in
let setup srv _ =
Operation_srv.P.V.bind
~proc_null:(fun () -> ())
~proc_operation
srv in
Rpc_netplex.rpc_factory
~configure:(fun _ _ -> ())
~name:"operation_service"
~setup
~hooks:(fun _ -> new hello_hooks "operation_service")
()
(**********************************************************************)
(* Main *)
(**********************************************************************)
let main() =
let (opt_list, cmdline_cfg) = Netplex_main.args() in
let use_mt = ref false in
let opt_list' =
[ "-mt", Arg.Set use_mt,
" Use multi-threading instead of multi-processing";
"-debug", Arg.String (fun s -> Netlog.Debug.enable_module s),
"<module> Enable debug messages for <module>";
"-debug-all", Arg.Unit (fun () -> Netlog.Debug.enable_all()),
" Enable all debug messages";
"-debug-list", Arg.Unit (fun () ->
List.iter print_endline (Netlog.Debug.names());
exit 0),
" Show possible modules for -debug, then exit";
] @ opt_list in
Arg.parse
opt_list'
(fun s -> raise (Arg.Bad ("Don't know what to do with: " ^ s)))
(sprintf "usage: %s [options]" (Filename.basename Sys.argv.(0)));
let parallelizer =
if !use_mt then
Netplex_mt.mt() (* multi-threading *)
else
Netplex_mp.mp() in (* multi-processing *)
Netplex_main.startup
parallelizer
Netplex_log.logger_factories (* allow all built-in logging styles *)
Netplex_workload.workload_manager_factories (* ... all ways of workload management *)
[ empty_service_factory();
echo_service_factory();
operation_service_factory()
]
cmdline_cfg
let () =
Netsys_signal.init();
main()
|