1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
type call = unit -> Process.t
(* this is used to control the scheduler behavior
* from the idle function *)
type 'a t = Terminate
| WaitingTask
| AddProcess of ('a * Process.t)
| AddTask of ('a * (call list list))
| Retry
| FinishTask of 'a
let to_string = function
| Terminate -> "terminate"
| WaitingTask -> "waiting-task"
| AddProcess (_,_) -> "add-process"
| AddTask (_,_) -> "add-task"
| Retry -> "retry"
| FinishTask _ -> "finish-task"
type 'a task_group = {
mutable completion : int;
mutable next : ('a * call) list list;
}
type stats = {
mutable max_runqueue : int;
mutable nb_processes : int;
}
type 'a state = {
mutable runqueue : ('a * Process.t) list;
mutable waitqueue : ('a * call) list;
mutable terminate : bool;
mutable waiting_task : bool;
mutable tasks : ('a * 'a task_group) list;
}
(* wait until a process finish. *)
let wait_process state =
let (proc_done, processes) = Process.wait state.runqueue in
let (task_done,_) = proc_done in
let finished_task =
try
let tg = List.assoc task_done state.tasks in
tg.completion <- tg.completion - 1;
if tg.completion = 0
then (
match tg.next with
| [] ->
state.tasks <- List.filter (fun (t,_) -> t <> task_done) state.tasks;
true
| g :: gs ->
tg.completion <- List.length g;
tg.next <- gs;
state.waitqueue <- g @ state.waitqueue;
false
) else
false
with Not_found ->
true
in
state.runqueue <- processes;
(proc_done, finished_task)
let rec idle_loop idle_fun on_task_finish_fun state =
match idle_fun () with
| Retry -> idle_loop idle_fun on_task_finish_fun state
| AddProcess p -> state.runqueue <- p :: state.runqueue
| WaitingTask -> state.waiting_task <- true
| Terminate -> state.terminate <- true
| FinishTask t -> on_task_finish_fun t; (* retry *) idle_loop idle_fun on_task_finish_fun state
| AddTask (t,ps) ->
(match List.map (List.map (fun p -> (t, p))) ps with
| [] -> failwith "internal error: empty task added to the scheduler"
| first::pss ->
let tg = {
completion = List.length first;
next = pss
} in
state.tasks <- (t,tg) :: state.tasks;
state.waitqueue <- first @ state.waitqueue;
)
(* when the scheduler has some room, we get the next task from
* taskdep and either start a process or call retry.
*
* Retry is returned when no process need to be spawned for the next task
* since the dependencies have not changed and thus the cache still have
* valid target file. Instead of returning retry, we could just go get
* the next task ourself.
*)
let schedule_idle taskdep dispatch_fun () =
if Taskdep.is_complete taskdep
then Terminate
else match Taskdep.get_next taskdep with
| None -> WaitingTask
| Some task -> dispatch_fun task
(* this is a simple one thread loop to schedule
* multiple tasks (forked) until they terminate
*
* the idle_fun function is called when there's capacity in the runqueue for
* another task.
*
* the finish function is called when a subtask of the task has finished.
* if all the subtasks in the task are done then the second value is set
* to true.
**)
let schedule j taskdep dispatch_fun finish_fun =
let st = {
runqueue = [];
waitqueue = [];
terminate = false;
waiting_task = false;
tasks = [];
} in
let on_task_finish task = Taskdep.mark_done taskdep task in
let stats = { max_runqueue = 0; nb_processes = 0 } in
let pick_process (task, process) remaining_processes =
stats.nb_processes <- stats.nb_processes + 1;
st.runqueue <- (task,process ()) :: st.runqueue;
st.waitqueue <- remaining_processes
in
let set_max () =
let m = List.length st.runqueue in
if stats.max_runqueue < m then stats.max_runqueue <- m
in
(* add more bulletproofing to prevent busy looping for no reason
* if user of this api is not behaving properly *)
while not st.terminate || st.runqueue <> [] || st.waitqueue <> [] do
while not st.terminate && not st.waiting_task && List.length st.runqueue < j do
match st.waitqueue with
| [] -> idle_loop (schedule_idle taskdep dispatch_fun) on_task_finish st
| (t,p)::procs -> pick_process (t,p) procs
done;
set_max ();
if List.length st.runqueue > 0
then
let (proc_done, finished_task) = wait_process st in
st.waiting_task <- false;
finish_fun proc_done finished_task
else
assert (st.terminate)
done;
stats
|