File: uq_mt.ml

package info (click to toggle)
ocamlnet 4.1.2-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 51,764 kB
  • ctags: 16,446
  • sloc: ml: 148,419; ansic: 10,989; sh: 1,885; makefile: 1,355
file content (124 lines) | stat: -rw-r--r-- 3,045 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
(* $Id$ *)

open Uq_engines.Operators

module Cond = struct
  type t = Netsys_oothr.condition
  let compare = compare
end


module CondSet = Set.Make(Cond)


type monitor =
    { esys : Unixqueue.event_system;
      mutable esys_running : bool;
      mutable waiters : CondSet.t;
        (* All threads waiting for an engine to complete have their own
	   condition variable. The variable is signalled when the engine
	   is finished (unless it is this thread that runs the esys).
	   We collect all these variables for another purpose, though.
	   When the current esys runner is done, another thread must be
	   woken up. We just take one condition from this set. 
	   The alternative would be to use a single condition variable
	   for all conditions. This is simpler, but the downside is that
	   we need then to broadcast it from time to time.
	 *)
      mutex : Netsys_oothr.mutex;
    }

exception Esys_exit


let create_monitor esys =
  { esys = esys;
    esys_running = false;
    waiters = CondSet.empty;
    mutex = !Netsys_oothr.provider # create_mutex()
  }


let monitor_run mon f arg =
  let cond = !Netsys_oothr.provider # create_condition() in
  let result = ref None in
  let esys_owner = ref false in
  let g = Unixqueue.new_group mon.esys in

  Unixqueue.once mon.esys g 0.0
    (fun () ->
       let inner_e =
	 try f mon.esys arg
	 with error -> eps_e (`Error error) mon.esys in
       ignore(
	 inner_e
	 >> (fun st ->
	       mon.mutex # lock();
	       result := Some st;
	       if !esys_owner then
		 Unixqueue.once mon.esys g 0.0 (fun () -> raise Esys_exit)
	       else
		 cond # signal();
	       mon.mutex # unlock();
	       `Done ()
	    )
       )
    );

  mon.mutex # lock();
  while !result = None do
    while !result = None && mon.esys_running do
      mon.waiters <- CondSet.add cond mon.waiters;
      cond # wait mon.mutex;
      mon.waiters <- CondSet.remove cond mon.waiters;
    done;
    if !result = None then (
      mon.esys_running <- true;
      esys_owner := true;
      mon.mutex # unlock();
      ( try
	  Unixqueue.run mon.esys
	with
	  | Esys_exit -> ()
	  | error ->
	      Netlog.logf `Crit
		"Uq_mt.monitor: caught exception: %s"
		(Netexn.to_string error)
      );
      mon.mutex # lock();
      esys_owner := false;
      mon.esys_running <- false
    )
  done;
  if not mon.esys_running && mon.waiters <> CondSet.empty then
    (CondSet.choose mon.waiters) # signal();
  mon.mutex # unlock();

  match !result with
    | Some(`Done x) -> x
    | Some(`Error e) -> raise e
    | Some(`Aborted) -> failwith "Uq_mt.monitor: aborted"
    | None -> assert false


let monitor_do mon f arg =
  monitor_run mon 
    (fun esys arg -> 
       let result = f arg in 
       eps_e (`Done result) esys
    )
    arg

let monitor_async mon f arg =
  monitor_run mon
    (fun esys arg ->
       let e, signal = Uq_engines.signal_engine esys in
       let emit get_result =
	 signal
	   ( try `Done(get_result())
	     with error -> `Error error
	   ) in
       f arg emit;
       e
    )
    arg