File: http_mt.ml

package info (click to toggle)
ocamlnet 2.2.9-8
  • links: PTS, VCS
  • area: main
  • in suites: squeeze
  • size: 17,724 kB
  • ctags: 10,053
  • sloc: ml: 63,928; ansic: 1,973; makefile: 800; sh: 651
file content (153 lines) | stat: -rw-r--r-- 4,201 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
(* HTTP Pipelines and multithreading
 *
 *
 * This is the recommended solution for multi-threaded apps:
 * A designated HTTP thread handles all HTTP requests, and the other threads
 * of the program send their HTTP requests to the HTTP thread. The HTTP
 * thread can process several requests in parallel.
 *)

(* Compile with:
 * ocamlfind ocamlopt -o t -package netclient,threads -linkpkg -thread http_mt.ml
 *)


open Http_client

exception HTTP_Job of http_call * (http_call -> unit)
  (* This is not an exception in the usual sense, but simply a tagged
   * pair (call, f_done). This pair is pushed onto the event queue to
   * send another HTTP request [call] to the HTTP thread. When the
   * request is processed, the function [f_done] is called. Note that
   * [f_done] is called in the context of the HTTP thread, and it must
   * arrange some synchronisation with the calling thread to return
   * the result.
   *)


let http_esys = ref None

let get_http_esys() =
  match !http_esys with
    | None -> failwith "No event system"
    | Some e -> e

let http_keep_alive_group = ref None 

let get_http_keep_alive_group() =
  match !http_keep_alive_group with
    | None -> failwith "No keep alive group"
    | Some g -> g


let http_init() =
  let esys = Unixqueue.create_unix_event_system() in
  let keep_alive_group = Unixqueue.new_group esys in
  http_esys := Some esys;
  http_keep_alive_group := Some keep_alive_group
;;


let http_thread() =
  (* Create the HTTP pipeline for a known event system: *)
  let esys = get_http_esys() in
  let pipeline = new pipeline in
  pipeline # set_event_system esys;

  (* In order to keep the event system active when there are no HTTP requests
   * to process, we add an artificial timer that never times out (-1.0).
   * The timer is bound to a Unixqueue group, and by clearing this group
   * the timer can be deleted.
   *)
  let keep_alive_group = get_http_keep_alive_group() in
  let w = Unixqueue.new_wait_id esys in
  Unixqueue.add_resource esys keep_alive_group (Unixqueue.Wait w,(-1.0));

  (* We arrange now that whenever a HTTP_Job arrives on the event queue,
   * a new HTTP call is started.
   *)
  Unixqueue.add_handler
    esys
    keep_alive_group
    (fun _ _ event ->
       match event with
	 | Unixqueue.Extra (HTTP_Job (call, f_done)) ->
	     pipeline # add_with_callback call f_done
	 | _ ->
	     raise Equeue.Reject  (* The event is not for us *)
    );

  (* Now start the event queue. It returns when all jobs are done and
   * the keep_alive_group is cleared.
   *)
  Unixqueue.run esys;
  ()
;;


let shutdown_http_thread() =
  let esys = get_http_esys() in
  let keep_alive_group = get_http_keep_alive_group() in
  Unixqueue.clear esys keep_alive_group;
  http_keep_alive_group := None;
  http_esys := None
;;


let caller_thread() =
  (* This is a thread that calls for an HTTP request *)
  let esys = get_http_esys() in
  let mutex = Mutex.create() in
  let cond = Condition.create () in
  let call = new get "http://localhost/" in
  let result = ref "" in
  let f_done call =
    (* This function is called from the scope of the HTTP thread!
     * Signal the calling thread that the call is done:
     *)
    Mutex.lock mutex;
    result := ( match call # status with
		  | `Successful ->
		      let body = call # response_body # value in
		      body
		  | _ ->
		      "some problem"
	      );
    Condition.signal cond;
    Mutex.unlock mutex
  in
  Unixqueue.add_event esys (Unixqueue.Extra (HTTP_Job(call, f_done)));
  (* Wait until we get a signal: *)
  Mutex.lock mutex;
  Condition.wait cond mutex;
  print_endline !result;
  flush stdout;
  Mutex.unlock mutex
;;


let _ =
  (* Unixqueue.set_debug_mode true; *)

  (* Initialize first: *)
  http_init();

  (* Start the HTTP thread: *)
  let http_thr = Thread.create http_thread () in
  
  (* Start a lot of caller threads: *)
  let callers = ref [] in
  for n = 1 to 100 do
    let thr = Thread.create caller_thread () in
    callers := thr :: !callers
  done;

  (* Wait until the callers return: *)
  List.iter Thread.join !callers;

  (* Shut down the HTTP thread, and wait until it is done *)
  shutdown_http_thread();
  Thread.join http_thr
;;