File: proxy_engine.ml

package info (click to toggle)
ocamlnet 4.1.9-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 54,024 kB
  • sloc: ml: 151,939; ansic: 11,071; sh: 2,003; makefile: 1,310
file content (246 lines) | stat: -rw-r--r-- 8,913 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
(* Advanced example for event-based engines. When receiving a HTTP request, the engine
 * connects to a remote TCP service, reads all data and forwards it to the HTTP client.
 * E.g. add to /etc/inetd.conf:
 *   99 stream tcp nowait nobody /bin/ps ps -ef
 * Then port 99 outputs a process listing.
 *
 * Use the URL /?host=<ip_address>&port=<port>
 *
 * You can also request arbitrary files: /?file=<filename>
 *
 * The whole example is realized with event-based I/O.
 *)

open Printf

let error_service_connection (cgi : Netcgi.cgi_activation) host port finish err =
  (* This is called when the connection with the proxied service could not be
   * established.
   *)
  printf "Error while connecting to service\n";
  flush stdout;
  cgi # set_header
    ~status: `Internal_server_error
    ~content_type:"text/plain"
    ();
  cgi # output # output_string (sprintf "Could not connect to host <%s> port <%d>: %s\n"
				    host port (Printexc.to_string err));
  finish()
;;


let on_service_connection env (cgi : Netcgi.cgi_activation) host port finish ues conn =
  (* The connection with the proxied service is established. Now just copy all data
   * from the descriptor to the asynchronous output channel in [env].
   *)
  let fd = Uq_client.client_endpoint conn in
  (* Some versions of Equeue have a bug in the service connector. So check first
   * whether we are really connected.
   *)
  let cont =
    ( try ignore(Unix.getpeername fd); true
      with err ->
	error_service_connection cgi host port finish err; false
    ) in
  if cont then (
    printf "Connected to service, copying data\n";
    flush stdout;
    (* But first set the output header: *)
    cgi # set_header
      ~status: `Ok
      ~content_type:"text/plain"
      ();
    let copy_engine = new Uq_transfer.receiver ~src:fd ~dst:env#output_ch_async ues in
    (* When the [copy_engine] is done, call [finish]: *)
    Uq_engines.when_state ~is_done:(fun _ -> finish())
                          ~is_error:(fun _ -> finish())
                            (* The error is not reported to anybody... *)
                          copy_engine;
    ()
  )
;;

let error_file_service (cgi : Netcgi.cgi_activation) filename finish err =
  printf "Error opening file\n";
  flush stdout;
  cgi # set_header
    ~status: `Internal_server_error
    ~content_type:"text/plain"
    ();
  cgi # output # output_string (sprintf "Could open file <%s> for reading: %s\n"
				    filename (Printexc.to_string err));
  finish()
;;

let setup_file_service env (cgi : Netcgi.cgi_activation) filename finish ues =
  try
    let fd = Unix.openfile filename [Unix.O_RDONLY] 0 in
    Unix.set_nonblock fd;
     (* First set the output header: *)
    cgi # set_header
      ~status: `Ok
      ~content_type:"text/plain"
      ();
    (* Then copy data from file to the output channel *)
    let copy_engine = new Uq_transfer.receiver ~src:fd ~dst:env#output_ch_async ues in
    (* When the [copy_engine] is done, call [finish]: *)
(*
    let file_finish() =
      ( try Unix.close fd with _ -> ());
      finish() 
    in
 *)
    Uq_engines.when_state ~is_done:(fun _ -> finish())
                          ~is_error:(fun _ -> finish())
                            (* The error is not reported to anybody... *)
                          copy_engine;
    ()
  with
      err ->
	error_file_service cgi filename finish err
;;

let on_request ues notification =
  (* This function is called when the full HTTP request has been received. We set up
   * another engine to connect to the remote service, and copy data from this service
   * to our output channel.
   *)
  printf "Received HTTP request\n";
  flush stdout;
  ( try
      let env = notification # environment in
      (* Create [cgi], and check CGI arguments. Mode is [`Direct], i.e. there is no
       * output buffering by [cgi]. This is important for us because this means that
       * [cgi # output] and [env # output_ch_async] are just the same channel - we
       * are going to use the latter for our gateway.
       *)
      let cgi =
	Netcgi_common.cgi_with_args 
	  (new Netcgi_common.cgi)
	  (env :> Netcgi.cgi_environment)
	  (`Direct "")
	  env#input_channel
	  (fun _ _ _ -> `Automatic) in
      let file = try Some( (cgi # argument "file")#value ) with Not_found -> None in
      let host = cgi # argument_value ~default:"127.0.0.1" "host" in
      let port = int_of_string (cgi # argument_value ~default:"37" "port") in
                 (* 37 = time service, often available *)
      ( match file with
	  | None ->
	      (* Now connect to this service *)
	      let sockspec = `Sock_inet(Unix.SOCK_STREAM, 
					Unix.inet_addr_of_string host, port) in
	      let opts = Uq_client.default_connect_options in
	      let conn_engine = Uq_client.connect_e (`Socket(sockspec,opts)) ues in
	      let fin = notification # schedule_finish in
	      Uq_engines.when_state 
		~is_done:(on_service_connection env cgi host port fin ues)
                  ~is_error:(error_service_connection cgi host port fin)
                  conn_engine;
	  | Some filename ->
	      let fin = notification # schedule_finish in
	      setup_file_service env cgi filename fin ues
      )

    with
	e ->
	  printf "Uncaught exception: %s\n" (Printexc.to_string e);
           flush stdout;
	  notification # schedule_finish()
  );
;;

let on_request_header ues 
                      (notification : Nethttpd_engine.http_request_header_notification) =
  (* After receiving the HTTP header: We always decide to accept the HTTP body, if any
   * is following. We do not set up special processing of this body, it is just
   * buffered until complete. Then [on_request] will be called.
   *
   * An advanced server could set up a further notification for the HTTP body. This
   * additional function would be called whenever new body data arrives. (Do so by
   * calling [notification # environment # input_ch_async # request_notification].)
   *)
  printf "Received HTTP header\n";
  flush stdout;
  notification # schedule_accept_body ~on_request:(on_request ues) ()
;;

let serve_connection ues fd =
  (* Creates the http engine for the connection [fd]. When a HTTP header is received
   * the function [on_request_header] is called.
   *)
  printf "Connected\n";
  flush stdout;
  let config = Nethttpd_engine.default_http_engine_config in
  Unix.set_nonblock fd;
  let http_engine = 
    new Nethttpd_engine.http_engine ~on_request_header:(on_request_header ues) () 
                                    config fd ues in
  ()
;;

let rec accept ues srv_sock_acc =
  (* This function accepts the next connection using the [acc_engine]. After the
   * connection has been accepted, it is served by [serve_connection], and the
   * next connection will be waited for (recursive call of [accept]). Because
   * [server_connection] returns immediately (it only sets the callbacks needed
   * for serving), the recursive call is also done immediately.
   *)
  let acc_engine = srv_sock_acc # accept() in
  Uq_engines.when_state ~is_done:(fun (fd,fd_spec) ->
			        if srv_sock_acc # multiple_connections then (
			          serve_connection ues fd;
			          accept ues srv_sock_acc
                                   ) else 
				  srv_sock_acc # shut_down())
                        ~is_error:(fun _ -> srv_sock_acc # shut_down())
                        acc_engine;
;;

let start() =
  (* We set up [lstn_engine] whose only purpose is to create a server socket listening
   * on the specified port. When the socket is set up, [accept] is called.
   *)
  printf "Listening on port 8765\n";
  flush stdout;
  let ues = Unixqueue.create_unix_event_system () in
  (* Unixqueue.set_debug_mode true; *)
  let opts = { Uq_server.default_listen_options with
		 Uq_server.lstn_backlog = 20;
		 Uq_server.lstn_reuseaddr = true } in
  let lstn_engine =
    Uq_server.listener
      (`Socket(`Sock_inet(Unix.SOCK_STREAM, Unix.inet_addr_any, 8765) ,opts)) ues in
  Uq_engines.when_state ~is_done:(accept ues) lstn_engine;
  (* Start the main event loop. *)
  Unixqueue.run ues
;;

let conf_debug() =
  (* Set the environment variable DEBUG to either:
       - a list of Netlog module names
       - the keyword "ALL" to output all messages
       - the keyword "LIST" to output a list of modules
     By setting DEBUG_WIN32 additional debugging for Win32 is enabled.
   *)
  let debug = try Sys.getenv "DEBUG" with Not_found -> "" in
  if debug = "ALL" then
    Netlog.Debug.enable_all()
  else if debug = "LIST" then (
    List.iter print_endline (Netlog.Debug.names());
    exit 0
  )
  else (
    let l = Netstring_str.split (Netstring_str.regexp "[ \t\r\n]+") debug in
    List.iter
      (fun m -> Netlog.Debug.enable_module m)
      l
  );
  if (try ignore(Sys.getenv "DEBUG_WIN32"); true with Not_found -> false) then
    Netsys_win32.Debug.debug_c_wrapper true
;;


Netsys_signal.init();
conf_debug();
start();;