File: helloworld.ml

package info (click to toggle)
ocamlnet 4.1.9-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 54,024 kB
  • sloc: ml: 151,939; ansic: 11,071; sh: 2,003; makefile: 1,310
file content (298 lines) | stat: -rw-r--r-- 9,181 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
(* $Id$ *)

(* This example demonstrates some very basic features of Netplex:
    - Starting an "empty service", i.e. one that does not provide
      any network functionality. Nevertheless a service process is
      forked, and all the hook functions are executed.
    - Echo service: Opens a network port, and echos all lines
      sent to it
    - Simple RPC server: Implements the RPC "operation" from operation.x

    All three services are started from a single program.

    Note that there is a fundamental difference between "echo" and "operation":
    As echo is written in synchronous (blocking) style, and we have only
    one process, there can be at most one TCP connection to this service.
    In contrast to this, "operation" can serve many connections in parallel.
    This is a functionality of the RPC layer.

    Test helloworld:
      $ ./helloworld -fg -conf helloworld.cfg

    Then connect to "echo":
      $ netcat localhost 4343

    Connect to "operation":
      $ ./test_client -port 4444 foo

    Use netplex-admin for administration:

      $ ../../src/netplex/netplex-admin -list
      operation: Enabled 1 containers
	  rpc/operation @ inet:0.0.0.0:4444
      echo: Enabled 1 containers
	  echo_proto @ inet:0.0.0.0:4343
      empty: Enabled 1 containers
	  dummy @ -
      netplex.controller: Enabled 1 containers
	  admin @ local:/tmp/.netplex/netplex.controller/admin

      $ ../../src/netplex/netplex-admin -containers
      operation: Enabled 1 containers
	  rpc/operation @ inet:0.0.0.0:4444
	  Process 29390: selected
      echo: Enabled 1 containers
	  echo_proto @ inet:0.0.0.0:4343
	  Process 29389: selected
      empty: Enabled 1 containers
	  dummy @ -
	  Process 29388: selected
      netplex.controller: Enabled 1 containers
	  admin @ local:/tmp/.netplex/netplex.controller/admin
	  AttachedToCtrlProcess 29387: selected

      $ ../../src/netplex/netplex-admin -restart empty
      (and watch the log messages from "empty")

      $ ../../src/netplex/netplex-admin foo bar baz
      (and watch the log messages from all services)

      $ ../../src/netplex/netplex-admin -shutdown
      (and watch the log messages from all services)
 *)

open Printf

(**********************************************************************)

(* hello_hooks:

   Define the processor hooks so that a message is logged
   for each. Normally, one inherits from 
   Netplex_kit.empty_processor_hooks and defines only the
   hooks that are needed.
 *)

class hello_hooks service_name : Netplex_types.processor_hooks =
object(self)
  inherit Netplex_kit.empty_processor_hooks()

  method post_add_hook _ _ =
    Netlog.logf `Info "%s: post_add_hook" service_name
      
  method post_rm_hook _ _ = 
    Netlog.logf `Info "%s: post_rm_hook" service_name
      
  method pre_start_hook _ _ _ =
    Netlog.logf `Info "%s: pre_start_hook" service_name
      
  method post_start_hook _ =
    Netlog.logf `Info "%s: post_start_hook" service_name
      
  method pre_finish_hook _ =
    Netlog.logf `Info "%s: pre_finish_hook" service_name
      
  method post_finish_hook _ _ _ =
    Netlog.logf `Info "%s: post_finish_hook" service_name
      
  method receive_message _ msg args =
    Netlog.logf `Info "%s: receive_message(\"%s\", [%s])"
      service_name
      (String.escaped msg)
      (String.concat ","
	 (List.map
	    (fun arg -> "\"" ^ String.escaped arg ^ "\"")
	    (Array.to_list args)))
      
  method receive_admin_message _ msg args =
    Netlog.logf `Info "%s: receive_admin_message(\"%s\", [%s])"
      service_name
      (String.escaped msg)
      (String.concat ","
	 (List.map
	    (fun arg -> "\"" ^ String.escaped arg ^ "\"")
	    (Array.to_list args)))
      
  method system_shutdown () =
    Netlog.logf `Info "%s: system_shutdown" service_name
      
  method shutdown() =
    Netlog.logf `Info "%s: shutdown" service_name
      
  method global_exception_handler e =
    Netlog.logf `Info "%s: global_exception_handler(%s)"
      service_name
      (Netexn.to_string e);
    true
end


(**********************************************************************)
(* Empty service                                                      *)
(**********************************************************************)

let empty_service_factory() : Netplex_types.processor_factory =
  ( object
      method name = "empty_service"

      method create_processor ctrl_cfg cf addr =
	( object (self)
	    inherit hello_hooks "empty_service"

	    method supported_ptypes = [ `Multi_processing; `Multi_threading ]

	    (* We don't expect that process is called. *)

	    method process ~when_done cont fd proto =
	      Netlog.logf `Info "empty_service: process(%s)" proto;
	      Unix.close fd;
	      when_done()

	  end
	)
    end
  )

(**********************************************************************)
(* Echo service                                                       *)
(**********************************************************************)

let echo_service_factory() : Netplex_types.processor_factory =
  ( object
      method name = "echo_service"

      method create_processor ctrl_cfg cf addr =
	( object (self)
	    inherit hello_hooks "echo_service"

	    method supported_ptypes = [ `Multi_processing; `Multi_threading ]

	    method process ~when_done cont fd proto =
	      Netlog.logf `Info "echo_service: process(%s)" proto;
	      (* fd is non-blocking, but we want it again blocking: *)
	      Unix.clear_nonblock fd;
	      (* We have to call when_done under all circumstances, so 
                 catch exceptions here
	       *)
	      try
		(* We cannot use here in_channel/out_channel as we have
                   a bidirectional connection. Netchannels has something
                   for us:
		 *)
		let rch = new Netchannels.socket_descr fd in
		(* On top of this, create buffered channels *)
		let ich = 
		  Netchannels.lift_in
		    (`Raw (rch :> Netchannels.raw_in_channel)) in
		let och = 
		  Netchannels.lift_out
		    (`Raw (rch :> Netchannels.raw_out_channel)) in
		( try
		    while true do
		      (* Read a line from ich, echo to och: *)
		      let line = ich # input_line() in
		      och # output_string line;
		      och # output_char '\n';
		      och # flush();
		    done;
		    assert false  (* don't reach this point *)
		  with
		    | End_of_file ->
			(* We finally get End_of_file from input_line *)
			ich # close_in();
			och # close_out()
		    | error ->
			ich # close_in();
			och # close_out();
			raise error
		);
		(* Done with it: *)
		when_done()
	      with
		| error ->
		    (* We have to ensure that when_done is always called,
                       even on error.
		     *)
		    Netlog.logf `Err
		      "Exception while echoing: %s" (Netexn.to_string error);
		    when_done();
		    (* We could raise the exception here again... *)
	  end
	)
    end
  )

(**********************************************************************)
(* "Operation"                                                        *)
(**********************************************************************)

let operation_service_factory() : Netplex_types.processor_factory =
  let proc_operation s =
    let l = String.length s in
    let u = String.create l in
    for k = 0 to l-1 do
      u.[k] <- s.[l-1-k]
    done;
    u in
  let setup srv _ =
    Operation_srv.P.V.bind
    ~proc_null:(fun () -> ())
    ~proc_operation
    srv in
  Rpc_netplex.rpc_factory
    ~configure:(fun _ _ -> ())
    ~name:"operation_service"
    ~setup
    ~hooks:(fun _ -> new hello_hooks "operation_service")
    ()

(**********************************************************************)
(* Main                                                               *)
(**********************************************************************)

let main() =
  let (opt_list, cmdline_cfg) = Netplex_main.args() in

  let use_mt = ref false in
  
  let opt_list' =
    [ "-mt", Arg.Set use_mt,
      "  Use multi-threading instead of multi-processing";
      
      "-debug", Arg.String (fun s -> Netlog.Debug.enable_module s),
      "<module>  Enable debug messages for <module>";

      "-debug-all", Arg.Unit (fun () -> Netlog.Debug.enable_all()),
      "  Enable all debug messages";

      "-debug-list", Arg.Unit (fun () -> 
                                 List.iter print_endline (Netlog.Debug.names());
                                 exit 0),
      "  Show possible modules for -debug, then exit";
   ] @ opt_list in
  
  Arg.parse
    opt_list'
    (fun s -> raise (Arg.Bad ("Don't know what to do with: " ^ s)))
    (sprintf "usage: %s [options]" (Filename.basename Sys.argv.(0)));

  let parallelizer =
    if !use_mt then
      Netplex_mt.mt()     (* multi-threading *)
    else
      Netplex_mp.mp() in  (* multi-processing *)
  
  Netplex_main.startup
    parallelizer
    Netplex_log.logger_factories   (* allow all built-in logging styles *)
    Netplex_workload.workload_manager_factories (* ... all ways of workload management *)
    [ empty_service_factory();
      echo_service_factory();
      operation_service_factory()
    ]
    cmdline_cfg


let () =
  Netsys_signal.init();
  main()