File: threadext.ml

package info (click to toggle)
xen-api-libs 0.5.2-3
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 1,940 kB
  • sloc: ml: 13,925; sh: 2,930; ansic: 1,699; makefile: 1,240; python: 83
file content (411 lines) | stat: -rw-r--r-- 12,533 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
(*
 * Copyright (C) 2006-2009 Citrix Systems Inc.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation; version 2.1 only. with the special
 * exception on linking described in file LICENSE.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *)

module Mutex = struct
	include Mutex
	(** execute the function f with the mutex hold *)
	let execute lock f =
		Mutex.lock lock;
		let r = begin try f () with exn -> Mutex.unlock lock; raise exn end; in
		Mutex.unlock lock;
		r
end


module Alarm = struct

	type t =
		  { token: Mutex.t ;
		    mutable queue: (float * (unit -> unit)) list ;
		    mutable notifier: (Unix.file_descr * Unix.file_descr) option ;
		  }

	let create () =
		{ token = Mutex.create () ;
		  queue = [] ;
		  notifier = None ;
		}

	let global_alarm = create ()

	let rec watch alarm =
		match alarm.notifier with
		| None -> assert false
		| Some (pipe_in, pipe_out) ->
			  while Thread.wait_timed_read pipe_in 0. do
				  ignore (Unix.read pipe_in " " 0 1)
			  done;
			  let next = Mutex.execute alarm.token
				  (fun () ->
					   let now = Unix.time () in
					   let nqueue = List.filter
						   (fun (clock, callback) ->
							    (* Create helper thread in case callback could block us *)
							    clock > now || (let _ = Thread.create callback () in false))
						   alarm.queue in
					   alarm.queue <- nqueue;
					   match nqueue with
					   | [] ->
						     Unix.close pipe_out;
						     Unix.close pipe_in;
						     alarm.notifier <- None;
						     None
					   | (c, _) :: _ ->
						     Some c) in
			  match next with
			  | None -> Thread.exit ()
			  | Some c ->
				    let now = Unix.time () in
				    if c > now then ignore (Thread.wait_timed_read pipe_in (c -. now));
				    watch alarm

	let register ?(alarm = global_alarm) time callback =
		Mutex.execute alarm.token
			(fun () ->
				 let nqueue = (time, callback) :: alarm.queue in
				 alarm.queue <- List.sort (fun x1 x2 -> compare (fst x1) (fst x2)) nqueue;
				 match alarm.notifier with
				 | Some (_, pipe_out) ->
					   ignore (Unix.write pipe_out "X" 0 1)
				 | None ->
					   let pipe_in, pipe_out = Unix.pipe () in
					   alarm.notifier <- Some (pipe_in, pipe_out);
					   ignore (Thread.create watch alarm))
end

module Thread = struct

	type t =
		| Running of Thread.t
		| Pending of pthread
	and pthread = float * int * Thread.t lazy_t

	type schedule = Now | Timeout of float | Indefinite

	type policy =
		| AlwaysRun
		| MaxCapacity of int * float option
		| WaitCondition of (unit -> schedule)

	let count = ref 0

	module PQueue = Set.Make(struct type t = pthread let compare = compare end)

	let running = ref 0

	let pqueue = ref PQueue.empty

	(* This info can be deduced from pqueue, but having a specific int val allow
	   us to inspect it with lower cost and be lock free *)
	let pending = ref 0

	let running_threads () = !running

	let pending_threads () = !pending

	let scheduler_token = Mutex.create ()

	let policy = ref AlwaysRun

	(* Should be protected by scheduler_token *)
	let run_thread ((_, _, pt) as t) =
		(* Might have run by other scheduling policy *)
		if PQueue.mem t !pqueue then
			(pqueue := PQueue.remove t !pqueue; decr pending);
		if not (Lazy.lazy_is_val pt) then
			let _ = Lazy.force pt in
			incr running

	let fake_pivot = max_float, 0, lazy (Thread.create ignore ())
	let pivot = ref fake_pivot
	let pre_pivot = ref max_int

	(* Should be protected by scheduler_token, this could be triggered either
	   because a thread finishes running and hence possibly provide an running
	   slot, or the scheduling policy has been updated hence more oppotunities
	   appear.  *)
	let rec run_pendings () =
		if not (PQueue.is_empty !pqueue) then
			let now = Unix.time() in
			let (c, _, _) as t = PQueue.min_elt !pqueue in
			(* Just in case policy has been changed *)
			let to_run = match !policy with
				| AlwaysRun -> true
				| MaxCapacity (max_threads, _) -> c <= now || !running < max_threads
				| WaitCondition f -> f () = Now in
			if to_run then (run_thread t; run_pendings ())
			else (* extra logic to avoid starvation or wrongly programmed deadlock *)
				let timeouts, exist, indefs = PQueue.split !pivot !pqueue in
				if not exist || (PQueue.cardinal timeouts >= !pre_pivot
				                 && (run_thread !pivot; true)) then
					pivot :=
						if PQueue.is_empty indefs then fake_pivot
						else PQueue.min_elt indefs;
				pre_pivot := PQueue.cardinal timeouts

	let exit () =
		Mutex.execute scheduler_token
			(fun () -> decr running; run_pendings ());
		Thread.exit ()

	let set_policy p =
		Mutex.execute scheduler_token
			(fun () ->
				 policy := p;
				 run_pendings ())

	let create ?(schedule=Indefinite) f x =
		let f' x =
			Pervasiveext.finally
				(fun () -> f x)
				exit in
		Mutex.execute scheduler_token
			(fun () ->
				 run_pendings ();
				 let timeout = match schedule with
					 | Now -> 0.
					 | Timeout t -> t
					 | Indefinite -> max_float in
				 let timeout =
					 if timeout = 0. then 0. else
						 match !policy with
						 | AlwaysRun -> 0.
						 | MaxCapacity (max_threads, max_wait_opt) ->
							   if !running < max_threads && PQueue.is_empty !pqueue then 0.
							   else begin match max_wait_opt with
							   | None -> timeout
							   | Some t -> min timeout t end
						 | WaitCondition f -> match f () with
						   | Now -> 0.
						   | Timeout t -> min t timeout
						   | Indefinite -> timeout in
				 if timeout <= 0. then
					 let t = Thread.create f' x in
					 incr running;
					 Running t
				 else
					 let deadline = 
						 if timeout < max_float then timeout +. Unix.time()
						 else max_float in
					 let pt = lazy (Thread.create f' x) in
					 incr count;
					 if !count = max_int then count := 0;
					 let t = (deadline, !count, pt) in
					 pqueue := PQueue.add t !pqueue;
					 incr pending;
					 if deadline < max_float then
						 Alarm.register deadline
							 (fun () -> Mutex.execute scheduler_token
								  (fun () -> run_thread t));
					 (* It's fine that a pended thread might get scheduled later on so
					    that the information held in 't' becomes meaningless. This is
					    comparable to the case that a Thread.t finishes running and its
					    thread id still exits.
					 *)
					 Pending t)

	let self () =
		(* When we get here, the thread must be running *)
		Running (Thread.self ())

	let id = function
		| Running t -> Thread.id t
		| Pending (_, id, _) ->
			  (* Pending thread have a negative id to avoid overlapping with running
			     thread id *)
			  -id

	let rec join = function
		| Running t -> Thread.join t
		| Pending ((_, _, pt) as t) ->
			  if not (Lazy.lazy_is_val pt) then begin
				  (* Give priority to those to be joined *)
				  Mutex.execute scheduler_token (fun () -> run_thread t);
				  assert (Lazy.lazy_is_val pt);
			  end;
			  Thread.join (Lazy.force pt)

	let kill = function
		| Running t ->
			  (* Not implemented in stdlib *)
			  Thread.kill t
		| Pending ((_, _, pt) as t) ->
			  if Lazy.lazy_is_val pt then
				  Thread.kill (Lazy.force pt)
			  else
				  Mutex.execute scheduler_token
					  (fun () ->
						   (* Just in case something happens before we grab the lock *)
						   if Lazy.lazy_is_val pt then Thread.kill (Lazy.force pt)
						   else (pqueue := PQueue.remove t !pqueue; decr pending))

	let delay = Thread.delay
	let exit = Thread.exit
	let wait_read = Thread.wait_read
	let wait_write = Thread.wait_write
	let wait_timed_read = Thread.wait_timed_read
	let wait_timed_write = Thread.wait_timed_write
	let wait_pid = Thread.wait_pid
	let select = Thread.select
	let yield = Thread.yield
	let sigmask = Thread.sigmask
	let wait_signal = Thread.wait_signal
end


(** create thread loops which periodically applies a function *)
module Thread_loop
  : functor (Tr : sig type t val delay : unit -> float end) ->
    sig
      val start : Tr.t -> (unit -> unit) -> unit
      val stop : Tr.t -> unit
      val update : Tr.t -> (unit -> unit) -> unit
    end
  = functor (Tr: sig type t val delay : unit -> float end) -> struct

    exception Done_loop
    let ref_table : ((Tr.t,(Mutex.t * Thread.t * bool ref)) Hashtbl.t) =
        Hashtbl.create 1

    (** Create a thread which periodically applies a function to the
        reference specified, and exits cleanly when removed *) 
    let start xref fn =
        let mut = Mutex.create () in
        let exit_var = ref false in
        (* create thread which periodically applies the function *)
        let tid = Thread.create (fun () ->
            try while true do
                Thread.delay (Tr.delay ());
                Mutex.execute mut (fun () ->
                  if !exit_var then
                    raise Done_loop;
                  let () = fn () in ()
                 );
            done; with Done_loop -> ();
        ) () in
        (* create thread to manage the reference table and clean it up
           safely once the delay thread is removed *)
        let _ = Thread.create (fun () ->
            Hashtbl.add ref_table xref (mut,tid,exit_var);
            Thread.join tid;
            List.iter (fun (_,t,_) ->
                if tid = t then Hashtbl.remove ref_table xref
            ) (Hashtbl.find_all ref_table xref)
        ) () in ()

    (** Remove a reference from the thread table *)
    let stop xref =
        try let mut,_,exit_ref = Hashtbl.find ref_table xref in
            Mutex.execute mut (fun () -> exit_ref := true)
        with Not_found -> ()

    (** Replace a thread with another one *)
    let update xref fn =
        stop xref;
        start xref fn
end

(** Parallel List.iter. Remembers all exceptions and returns an association list mapping input x to an exception.
    Applications of x which succeed will be missing from the returned list. *)
let thread_iter_all_exns f xs = 
  let exns = ref [] in
  let m = Mutex.create () in
  List.iter 
    Thread.join 
    (List.map 
       (fun x -> 
	  Thread.create 
	    (fun () ->  
	       try
		 f x
	       with e -> Mutex.execute m (fun () -> exns := (x, e) :: !exns)
	    )
	    ()
       ) xs);
  !exns

(** Parallel List.iter. Remembers one exception (at random) and throws it in the 
   error case. *)
let thread_iter f xs = match thread_iter_all_exns f xs with
  | [] -> ()
  | (_, e) :: _ -> raise e

module Delay = struct
  (* Concrete type is the ends of a pipe *)
  type t = { 
    (* A pipe is used to wake up a thread blocked in wait: *)
    mutable pipe_out: Unix.file_descr option;
    mutable pipe_in: Unix.file_descr option;
    (* Indicates that a signal arrived before a wait: *)
    mutable signalled: bool;
    m: Mutex.t
  }

  let make () = 
    { pipe_out = None;
      pipe_in = None;
      signalled = false;
      m = Mutex.create () }

  exception Pre_signalled

  let wait (x: t) (seconds: float) =
    let to_close = ref [ ] in
    let close' fd = 
      if List.mem fd !to_close then Unix.close fd;
      to_close := List.filter (fun x -> fd <> x) !to_close in
    Pervasiveext.finally
      (fun () ->
	 try
	   let pipe_out = Mutex.execute x.m
	     (fun () ->
		if x.signalled then begin
		  x.signalled <- false;
		  raise Pre_signalled;
		end;
		let pipe_out, pipe_in = Unix.pipe () in
		(* these will be unconditionally closed on exit *)
		to_close := [ pipe_out; pipe_in ];
		x.pipe_out <- Some pipe_out;
		x.pipe_in <- Some pipe_in;
		x.signalled <- false;
		pipe_out) in
	   let r, _, _ = Unix.select [ pipe_out ] [] [] seconds in
	   (* flush the single byte from the pipe *)
	   if r <> [] then ignore(Unix.read pipe_out (String.create 1) 0 1);
	   (* return true if we waited the full length of time, false if we were woken *)
	   r = []
	 with Pre_signalled -> false
      )
      (fun () -> 
	 Mutex.execute x.m
	   (fun () ->
	      x.pipe_out <- None;
	      x.pipe_in <- None;
	      List.iter close' !to_close)
      )

  let signal (x: t) = 
    Mutex.execute x.m
      (fun () ->
	 match x.pipe_in with
	 | Some fd -> ignore(Unix.write fd "X" 0 1)
	 | None -> x.signalled <- true 	 (* If the wait hasn't happened yet then store up the signal *)
      )
end

let keep_alive () =
	while true do
		Thread.delay 20000.
	done