File: server.ml

package info (click to toggle)
camlrpc 0.4.1-7
  • links: PTS
  • area: main
  • in suites: sarge
  • size: 1,080 kB
  • ctags: 1,474
  • sloc: ml: 11,901; makefile: 592; sh: 345; ansic: 331
file content (174 lines) | stat: -rw-r--r-- 4,933 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
(* Server:
 *
 * In order to define a server the various remote procedure numbers must be
 * bound to real O'Caml functions. There are two ways of doing this:
 * - binding as synchronous call: The function returns immediately the result.
 * - binding as asynchronous call: The function receives the call but need
 *     not to produce a reply immediately. The function can do further I/O,
 *     for example call another remote procedure without blocking the server,
 *     and reply when the answer is available.
 *
 * Here, the procedure "plus1" and "sortarray" are implemented as synchronous
 * calls while "sortlist" works in an asynchronous way. "sortlist" first
 * converts its argument list into an array, calls then "sortarray" and
 * converts the result back. (This is a bit artificial...)
 *)

open Rtypes
open Xdr
open Rpc
open Procdef
open Rpc_server

let server_port = ref 0;;
  (* this variable will contain the Internet port where this server is
   * listening
   *)



(***** Implementation of the procedures *****)



let plus1 (XV_int n) =
  (* Note that 'n' is a 32 bit signed integer. On 32 bit architectures,
   * such numbers cannot always be represented as O'Caml uses 31 bit signed
   * integers. So the following might generate a 'Cannot_represent' exception.
   * Note that this exception does not terminate the server.
   *)

  XV_int (int4_of_int (int_of_int4 n + 1))
;;


let sortarray (XV_array l) =
  Sort.array ( <= ) l;
  XV_array l
;;


let sortlist session l =
  (* some conversion functions: *)

  let rec convert_to_plain_list l =
    match l with
      XV_union_over_enum ("FALSE", _) -> []
    | XV_union_over_enum ("TRUE", XV_struct ["value", v; "next", l']) ->
	v :: convert_to_plain_list l'
  in

  let rec convert_to_xdr_list l =
    match l with
      []    -> XV_union_over_enum("FALSE", XV_void)
    | x::l' -> XV_union_over_enum("TRUE", XV_struct ["value", x; 
						      "next", convert_to_xdr_list l'
						    ])
  in

  let l_as_array = XV_array (Array.of_list(convert_to_plain_list l)) in

  let esys = Rpc_server.get_event_system session in
  (* the event system behind the server *)
  
  (* Call this server recursively. To do so, we need a 'client' that is 
   * attached to the same event system as the server.
   *)

  let client = 
    Rpc_client.create 
      esys
      (Rpc_client.InetNumeric (127,0,0,1, !server_port))
      Tcp
      program
  in

  Rpc_client.configure client 0 10.0;        (* set a timeout of 10 seconds *)

  (* Push the call onto the queue: *)

  Rpc_client.add_call 
    client
    "sortarray"
    l_as_array
    (fun get_result ->
      (* This function is called when the result has been arrived. *)
      try
	(* Obtain the result and convert it back: *)
	let XV_array result = get_result () in
	let result_as_xdr_list = convert_to_xdr_list (Array.to_list result) in
	(* Push the result onto the reply queue: *)
	Rpc_server.reply session result_as_xdr_list;
	(* The client is not needed any longer: *)
	Rpc_client.shut_down client;
      with
	any_exception ->
	  (* Shut down the client in this case, too: *)
	  Rpc_client.shut_down client;
	  (* Print the exception: *)
	  prerr_endline ("sortlist exception: " ^ 
			 Printexc.to_string any_exception);
	  raise any_exception)

  (* This was all. The rest is done in an event-driven way. *)
;;



(***** Building a server *****)



let main() =
  let esys = Unixqueue.create_unix_event_system() in

     (* esys: the "event system", i.e. the means where events arrive and are
      * processed by forwarding them to event handlers. "esys" contains an
      * event queue of unprocessed events so far, a set of handlers and
      * a set of resources which are conditions on file descriptors producing
      * events.
      *)

  let server =
    Rpc_server.create
      esys
      Rpc_server.Portmapped          (* register with the portmapper *)
      Tcp
      Socket
      program
      [ Sync  { sync_name = "plus1";        (* bind "plus1" *)
		sync_proc = plus1 };
	Sync  { sync_name = "sortarray";    (* bind "sortarray" *)
		sync_proc = sortarray };
	Async { async_name = "sortlist";    (* bind "sortlist" *)
		async_invoke = sortlist } ]
     100                          (* maximum number of parallel connections *)
  in

  (* Set signal handler. Signals are the only way to stop the server;
   * the default behaviour does not clean up the server, so we define
   * appropriate handlers. 
   * Clean-up to do is mostly unregistering the program with the portmapper.
   *)

  List.iter
    (fun signal ->
      Sys.set_signal 
        signal
	(Sys.Signal_handle (fun _ -> Rpc_server.stop_server server)))
    [ Sys.sighup; Sys.sigint; Sys.sigquit; Sys.sigterm ];

  (* Initialize the 'server_port' variable *)

  server_port := Rpc_portmapper.port_of_program program "localhost" Tcp;

  (* Now start serving *)

  Unixqueue.run esys
;;


(***** running the server *****)


main();;