1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
|
(* $Id$ *)
(* This example needs OCaml-4.02 or newer *)
open Netplex_types
open Printf
type message =
| Request of string list
| Response of string
type _ polysocket_kind +=
| Tmessage : message polysocket_kind
let concat_service_factory() : processor_factory =
( object
method name = "concat_service"
method create_processor ctrl_cfg cf addr =
( object (self)
inherit Netplex_kit.empty_processor_hooks()
method supported_ptypes = [ `Multi_threading ]
method process ~when_done cont fd proto =
failwith "process: not supported for an internal-only service"
method process_internal ~when_done cont srvbox proto =
let Polyserver_box(kind, srv) = srvbox in
match kind with
| Tmessage ->
let (rd,wr) =
Netsys_polysocket.accept ~nonblock:false srv in
( try
while true do
let msg =
Netsys_polypipe.read ~nonblock:false rd in
let req =
match msg with
| Some(Request l) -> l
| Some Response _ ->
failwith "process_internal: got Response"
| None ->
raise End_of_file in
printf "Server: processing message\n%!";
let resp = String.concat "/" req in
Netsys_polypipe.write
~nonblock:false wr (Some (Response resp));
done;
with End_of_file ->
printf "Server: got EOF, now closing\n%!";
Netsys_polypipe.write
~nonblock:false wr None; (* respond with EOF *)
(* Not closing rd/wr. This is the task of the client *)
when_done()
)
| _ ->
failwith "process_internal: wrong kind"
method config_internal =
[ "my_protocol", Polysocket_kind_box Tmessage ]
end
)
end
)
let rpc_service_factory() : Netplex_types.processor_factory =
let proc_operation s =
let l = String.length s in
let u = Bytes.create l in
for k = 0 to l-1 do
Bytes.set u k s.[l-1-k]
done;
Bytes.to_string u in
let setup srv _ =
Operation_srv.P.V.bind
~proc_null:(fun () -> ())
~proc_operation
srv in
Rpc_netplex.rpc_factory
~configure:(fun _ _ -> ())
~name:"rpc_service"
~setup
~hooks:(fun _ -> new Netplex_kit.empty_processor_hooks())
()
let same : type s t . s polysocket_kind * t polysocket_kind -> (s,t) eq =
function
| Tmessage, Tmessage -> Equal
| Txdr, Txdr -> Equal
| _ -> Not_equal
let kind_check =
fun k ->
same (Tmessage,k)
let client_hooks =
( object
inherit Netplex_kit.empty_processor_hooks ()
method post_start_hook _ =
(* This code is run in a different thread. Create here a client and
check the internal service out
*)
let client =
Netplex_internal.connect_client
{ Netplex_types.kind_check = fun k -> same (Tmessage,k) }
1
"my_server_identifier" in
let (rd,wr) =
Netsys_polysocket.endpoint ~synchronous:true ~nonblock:false client in
let req =
Request [ "abc"; "123"; "XYZ" ] in
Netsys_polypipe.write ~nonblock:false wr (Some req);
let resp =
Netsys_polypipe.read ~nonblock:false rd in
( match resp with
| Some(Response s) ->
printf "Client: Got response: %s\n%!" s
| Some(Request _) ->
failwith "got request back"
| None ->
failwith "got EOF"
);
(* now send eof *)
Netsys_polypipe.write ~nonblock:false wr None;
(* now await eof *)
while
Netsys_polypipe.read ~nonblock:false rd <> None
do () done;
printf "Client: Got EOF\n%!";
Netsys_polypipe.close rd;
Netsys_polypipe.close wr;
Netsys_polysocket.close_client client;
printf "Client: done\n%!";
(* Now connect to operation_service, an RPC-based internal service *)
let other_client =
Netplex_internal.connect_client
{ Netplex_types.kind_check = fun k -> same (Txdr,k) }
1
"rpc_server_identifier" in
let rpc_client =
Operation_clnt.P.V.create_client2
(`Internal_socket other_client) in
let r = Operation_clnt.P.V.operation rpc_client "abcdef" in
printf "Client: RPC result = %s\n%!" r;
Rpc_client.shut_down rpc_client;
Netplex_cenv.system_shutdown()
end
)
let main() =
let (opt_list, cmdline_cfg) = Netplex_main.args() in
let opt_list' =
[ "-debug", Arg.String (fun s -> Netlog.Debug.enable_module s),
"<module> Enable debug messages for <module>";
"-debug-all", Arg.Unit (fun () -> Netlog.Debug.enable_all()),
" Enable all debug messages";
"-debug-list", Arg.Unit (fun () ->
List.iter print_endline (Netlog.Debug.names());
exit 0),
" Show possible modules for -debug, then exit";
] @ opt_list in
Arg.parse
opt_list'
(fun s -> raise (Arg.Bad ("Don't know what to do with: " ^ s)))
(sprintf "usage: %s [options]" (Filename.basename Sys.argv.(0)));
let parallelizer = Netplex_mt.mt() in
Netplex_main.startup
~late_initializer:(
fun _ ctrl ->
Netplex_kit.add_helper_service ctrl "client" client_hooks
)
parallelizer
Netplex_log.logger_factories (* allow all built-in logging styles *)
Netplex_workload.workload_manager_factories (* ... all ways of workload management *)
[ concat_service_factory();
rpc_service_factory();
]
cmdline_cfg
let () =
Printexc.record_backtrace true;
Netsys_signal.init();
main()
|