File: task_runner.ml

package info (click to toggle)
marionnet 0.90.6+bzr508-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid
  • size: 9,532 kB
  • sloc: ml: 18,130; sh: 5,384; xml: 1,152; makefile: 1,003; ansic: 275
file content (217 lines) | stat: -rw-r--r-- 7,078 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
(* This file is part of Marionnet, a virtual network laboratory
   Copyright (C) 2007, 2008  Luca Saiu
   Copyright (C) 2009, 2010  Jean-Vincent Loddo
   Copyright (C) 2007, 2008, 2009, 2010  Université Paris 13

   This program is free software: you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation, either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program.  If not, see <http://www.gnu.org/licenses/>. *)

open Graph;;
open Message_passing;;

let do_in_parallel thunks =
  (* Make a thread per thunk: *)
  let threads =
    List.map
      (fun thunk ->
        Thread.create
          (fun () ->
            try
              thunk ()
            with e -> begin
              Log.printf1 "!!!! do_in_parallel: a thunk failed (%s)\n" (Printexc.to_string e);
            end)
          ())
      thunks in
  (* Wait that they terminate: *)
  List.iter
    (fun thread ->
      try
        Thread.join thread;
      with e -> begin
        Log.printf1 "!!!!!!!!!!!!!!! This should not happen: join failed (%s)\n"
          (Printexc.to_string e);
      end)
    threads;;

(** A thunk is trivially represented a unit->unit function: *)
type thunk =
    unit -> unit;;

(** A task is a thunk, to be executed by the task-runner thread: *)
type task = thunk;;

(** A graph of tasks combines tasks with their dependency relation: *)
type task_graph =
    thunk graph;;

(** This is only used internally. *)
exception Kill_task_runner;;

(** This class allows the user to enqueue tasks (represented as thunks) to
    be executed one after another, all in the same thread which is
    created at initialization time. *)
class task_runner = object(self)
  val queue : (string * (unit -> unit)) queue = new queue

  val run_again = ref true

  initializer
(*     (\** Handle SIGCHLD by default: *\) *)
(*     Signals.install_sigchld_handler (); *) (* No, absolutely not! :-) *)
    ignore (Thread.create
              (fun () ->
                while !run_again do
                  Log.printf "task_runner: I'm ready for the next task...\n";
                  let name, task = queue#dequeue in
                  self#run name task;
                done)
              ())

  method run name task =
    flush_all ();
    try
      Log.printf1 "task_runner: Executing the task \"%s\"\n" name;
      task ();
      Log.printf1 "task_runner: The task \"%s\" succeeded.\n" name;
    with Kill_task_runner -> begin
      Log.printf "task_runner: The task runner was explicitly killed.\n";
      run_again := false;
    end
    | e -> begin
      Log.printf2
        "task_runner: WARNING: the asynchronous task \"%s\" raised an exception\n\t(%s).\n\tTHIS MAY BE SERIOUS.\n\tAnyway, I'm going to continue with the next task.\n"
        name
        (Printexc.to_string e) ;
    end

  method schedule ?(name="[unnamed task]") task =
    queue#enqueue (name, task)

  method prepend ?(name="[unnamed task]") task =
    queue#prepend (name, task)

  (** This queue is only used for synchronization purposes. Message-passing
      synchronizations is way spiffier :-) *)
  val dummy_queue : unit queue = new queue

  (** Wait that all tasks which are currently scheduled terminate, synchronously.
      In the mean time more tasks can be scheduled as usual. *)
  method wait_for_all_currently_scheduled_tasks =
    self#schedule
      ~name:"wait until all scheduled tasks terminate"
      (fun () -> dummy_queue#enqueue ());
    Log.printf "Waiting for all currently enqueued tasks to terminate...\n";
    let () = dummy_queue#dequeue in
    Log.printf "...all right, we have been signaled: tasks did terminate.\n";

  method schedule_parallel (names_and_thunks : (string * thunk) list) =
    let parallel_task_name =
      List.fold_left
        (fun s name -> s ^ name ^ " || ")
        "In parallel: "
        (List.map (fun (name, _) -> name) names_and_thunks) in
    let parallel_task_thunk =
      fun () ->
        let threads =
          List.map
            (fun (name, thunk) -> name, Thread.create thunk ())
            names_and_thunks in
        List.iter
          (fun (name, thread) ->
            Log.printf1 "Joining \"%s\"...\n" name;
            (try
              Thread.join thread;
            with e -> begin
              Log.printf1
                "!!!!!!!!!!!!!!! This should not happen: join failed (%s)\n"
                (Printexc.to_string e);
            end);
            Log.printf1 "I have joined \"%s\" with success\n" name;)
          threads in
(*     self#schedule ~name:parallel_task_name parallel_task_thunk *)
    self#prepend ~name:parallel_task_name parallel_task_thunk

  (** A user-friendly way to schedule a set of tasks with a dependency graph.
      The description is a list of triples
      <name, list of dependencies as names, thunk>. *)
  method schedule_tasks description =
    let g = make_empty_graph () in
    (* First make the nodes, and a local name -> id table: *)
    let table = Hashtbl.create ((List.length (get_node_ids g)) * 2) in
    List.iter
      (fun (name, _, thunk) ->
        let id = add_node (name, thunk) g in
        Hashtbl.add table name id)
      description;
    (* Now make edges: *)
    List.iter
      (fun (name, dependencies, _) ->
        List.iter
          (fun a_dependency ->
            add_edge
              (Hashtbl.find table name)
              (Hashtbl.find table a_dependency)
              g)
          dependencies)
      description;
    (* Ok, we have the graph. Now we can use it to schedule tasks in some
       reasonable order: *)
    List.iter
      (fun id ->
        let name, thunk = get_node id g in
        self#schedule ~name thunk)
      (topological_sort g)

(*  method terminate =
    self#prepend
      ~name:"Destroying the task runner"
      (fun () -> raise Kill_task_runner)*)
end;;

let the_task_runner =
  new task_runner;;

(*
let g =
  schedule_tasks
    [ "a", [],
      (fun () -> print_string "a\n");
      "b", ["c"],
      (fun () -> print_string "b\n");
      "c", ["a"],
      (fun () -> print_string "c\n"); ];;

let sorted_nodes = topological_sort g;;
let _ =
  List.iter
    (fun id ->
      let name, thunk = get_node id g in
      Log.printf "Executing %s\n" name;
      thunk ();)
    sorted_nodes;;
*)

(* ------------------------------------------------------------------------------------ *)
(* Example: *)
(*
let _ =
  schedule_tasks
    [ "a", [],
      (fun () -> print_string "(a) This will come first.\n");
      "b", ["c"],
      (fun () -> print_string "(b) This will come after c.\n");
      "c", ["a"],
      (fun () -> print_string "(c) This will come after a.\n"); ];;
Unix.sleep 3;;
*)