1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
|
(* Server:
*
* In order to define a server the various remote procedure numbers must be
* bound to real O'Caml functions. There are two ways of doing this:
* - binding as synchronous call: The function returns immediately the result.
* - binding as asynchronous call: The function receives the call but need
* not to produce a reply immediately. The function can do further I/O,
* for example call another remote procedure without blocking the server,
* and reply when the answer is available.
*
* Here, the procedure "plus1" and "sortarray" are implemented as synchronous
* calls while "sortlist" works in an asynchronous way. "sortlist" first
* converts its argument list into an array, calls then "sortarray" and
* converts the result back. (This is a bit artificial...)
*)
open Rtypes
open Xdr
open Rpc
open Procdef
open Rpc_server
let server_port = ref 0;;
(* this variable will contain the Internet port where this server is
* listening
*)
(***** Implementation of the procedures *****)
let plus1 (XV_int n) =
(* Note that 'n' is a 32 bit signed integer. On 32 bit architectures,
* such numbers cannot always be represented as O'Caml uses 31 bit signed
* integers. So the following might generate a 'Cannot_represent' exception.
* Note that this exception does not terminate the server.
*)
XV_int (int4_of_int (int_of_int4 n + 1))
;;
let sortarray (XV_array l) =
Sort.array ( <= ) l;
XV_array l
;;
let sortlist session l =
(* some conversion functions: *)
let rec convert_to_plain_list l =
match l with
XV_union_over_enum ("FALSE", _) -> []
| XV_union_over_enum ("TRUE", XV_struct ["value", v; "next", l']) ->
v :: convert_to_plain_list l'
in
let rec convert_to_xdr_list l =
match l with
[] -> XV_union_over_enum("FALSE", XV_void)
| x::l' -> XV_union_over_enum("TRUE", XV_struct ["value", x;
"next", convert_to_xdr_list l'
])
in
let l_as_array = XV_array (Array.of_list(convert_to_plain_list l)) in
let esys = Rpc_server.get_event_system session in
(* the event system behind the server *)
(* Call this server recursively. To do so, we need a 'client' that is
* attached to the same event system as the server.
*)
let client =
Rpc_client.create
esys
(Rpc_client.InetNumeric (127,0,0,1, !server_port))
Tcp
program
in
Rpc_client.configure client 0 10.0; (* set a timeout of 10 seconds *)
(* Push the call onto the queue: *)
Rpc_client.add_call
client
"sortarray"
l_as_array
(fun get_result ->
(* This function is called when the result has been arrived. *)
try
(* Obtain the result and convert it back: *)
let XV_array result = get_result () in
let result_as_xdr_list = convert_to_xdr_list (Array.to_list result) in
(* Push the result onto the reply queue: *)
Rpc_server.reply session result_as_xdr_list;
(* The client is not needed any longer: *)
Rpc_client.shut_down client;
with
any_exception ->
(* Shut down the client in this case, too: *)
Rpc_client.shut_down client;
(* Print the exception: *)
prerr_endline ("sortlist exception: " ^
Printexc.to_string any_exception);
raise any_exception)
(* This was all. The rest is done in an event-driven way. *)
;;
(***** Building a server *****)
let main() =
let esys = Unixqueue.create_unix_event_system() in
(* esys: the "event system", i.e. the means where events arrive and are
* processed by forwarding them to event handlers. "esys" contains an
* event queue of unprocessed events so far, a set of handlers and
* a set of resources which are conditions on file descriptors producing
* events.
*)
let server =
Rpc_server.create
esys
Rpc_server.Portmapped (* register with the portmapper *)
Tcp
Socket
program
[ Sync { sync_name = "plus1"; (* bind "plus1" *)
sync_proc = plus1 };
Sync { sync_name = "sortarray"; (* bind "sortarray" *)
sync_proc = sortarray };
Async { async_name = "sortlist"; (* bind "sortlist" *)
async_invoke = sortlist } ]
100 (* maximum number of parallel connections *)
in
(* Set signal handler. Signals are the only way to stop the server;
* the default behaviour does not clean up the server, so we define
* appropriate handlers.
* Clean-up to do is mostly unregistering the program with the portmapper.
*)
List.iter
(fun signal ->
Sys.set_signal
signal
(Sys.Signal_handle (fun _ -> Rpc_server.stop_server server)))
[ Sys.sighup; Sys.sigint; Sys.sigquit; Sys.sigterm ];
(* Initialize the 'server_port' variable *)
server_port := Rpc_portmapper.port_of_program program "localhost" Tcp;
(* Now start serving *)
Unixqueue.run esys
;;
(***** running the server *****)
main();;
|