1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
|
(******************************************************************************
* Core *
* *
* Copyright (C) 2008- Jane Street Holding, LLC *
* Contact: opensource@janestreet.com *
* WWW: http://www.janestreet.com/ocaml *
* *
* *
* This library is free software; you can redistribute it and/or *
* modify it under the terms of the GNU Lesser General Public *
* License as published by the Free Software Foundation; either *
* version 2 of the License, or (at your option) any later version. *
* *
* This library is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
* Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public *
* License along with this library; if not, write to the Free Software *
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA *
* *
******************************************************************************)
open Std_internal
module Mutex = Mutex0
type interval =
| INone
| INormal of Span.t
| IRandom of Span.t * float
type status = Activated | Deactivating | Deactivated
(* Mutex [mtx] must be held when modifying [status] and [events]. Such
modifications require that the timer thread and other threads potentially
waiting on the condition variable be woken up. Hence the condition
variable always needs to be broadcast thereafter. [wrap_update] makes
this safe and easy. The timer thread may also wake up while waiting on
the condition variable when the timer expires. *)
type t =
{
mutable status : status;
events : event Heap.t;
mtx : Mutex.t;
cnd : Condition.t;
mutable now : Time.t;
}
and event =
{
mutable time : Time.t;
mutable interval : interval;
handler : event -> Time.t -> unit;
timer : t;
mutable t_event_opt : event Heap.heap_el option;
}
let run_timer timer =
let mtx = timer.mtx in
let rec handle_events () =
(* Assumes that mutex is held *)
match timer.status with
| Deactivating ->
timer.status <- Deactivated;
Condition.broadcast timer.cnd;
Mutex.unlock mtx
| Deactivated -> assert false (* impossible *)
| Activated ->
match Heap.top_heap_el timer.events with
| None ->
Condition.wait timer.cnd mtx;
timer.now <- Time.now ();
handle_events ()
| Some event ->
let ev = Heap.heap_el_get_el event in
let sched_time = ev.time in
let now = timer.now in
if Time.(<) now sched_time then begin
(* Event in the future, wait until then or until signal *)
ignore (Core_condition.timedwait timer.cnd mtx sched_time : bool);
timer.now <- Time.now ()
end else begin
(* Update event on the heap as necessary *)
begin match ev.interval with
| INone -> Heap.remove event
| INormal span ->
ev.time <- Time.add now span;
Heap.update event ev
| IRandom (span, max_ratio) ->
let p2 = Random.float 2.0 in
let p = p2 -. 1. in
let confusion = Span.scale span (max_ratio *. p) in
ev.time <- Time.add (Time.add now span) confusion;
Heap.update event ev
end;
Mutex.unlock mtx;
begin
try ev.handler ev now
with e ->
eprintf "Timer.run: Exception in event handler: %s\n%!"
(Exn.to_string e)
end;
Mutex.lock mtx
end;
handle_events ()
in
Mutex.lock mtx;
handle_events ()
let create ?(min_size = 1000) () =
let events =
Heap.create ~min_size (fun ev1 ev2 -> Time.compare ev1.time ev2.time)
in
let timer =
{
status = Activated;
events = events;
mtx = Mutex.create ();
cnd = Condition.create ();
now = Time.now ();
}
in
ignore (Thread.create run_timer timer);
timer
let size timer =
Mutex.critical_section timer.mtx ~f:(fun () -> Heap.length timer.events)
let deactivate timer =
Mutex.critical_section timer.mtx ~f:(fun () ->
let rec wait () = Condition.wait timer.cnd timer.mtx; check ()
and check () =
match timer.status with
| Activated ->
timer.status <- Deactivating;
Condition.broadcast timer.cnd;
wait ()
| Deactivating -> wait ()
| Deactivated -> ()
in
check ())
let check_span loc span =
if Span.(<) span Span.zero then
invalid_arg (sprintf "Timer.%s: span < 0" loc)
let get_interval_param loc randomize = function
| None -> INone
| Some span ->
check_span loc span;
match randomize with
| None -> INormal span
| Some max_ratio ->
if max_ratio < 0. || 1. < max_ratio then
invalid_arg (
sprintf "Timer.%s: max_ratio not in range [0.0, 1.0]" loc);
IRandom (span, max_ratio)
(* Makes sure that the timer thread gets signaled only if the element
at the top of the heap requires earlier wakeups *)
let wrap_update timer ~f =
Mutex.critical_section timer.mtx ~f:(fun () ->
let top_before_f = Heap.top timer.events in
let res = f () in
let top_after_f = Heap.top timer.events in
match top_after_f with
| None -> res (* Nothing on the queue, so no wake-up *)
| Some top_after_f ->
match top_before_f with
| None ->
(* Heap was empty and is now not, so wake up immediately *)
Condition.broadcast timer.cnd;
res
| Some top_before_f ->
if Time.(<=) top_before_f.time top_after_f.time then res
else (
(* Earlier event time at top: we have to wake up *)
Condition.broadcast timer.cnd;
res))
let add_abs timer handler ?randomize ?interval time =
let interval = get_interval_param "add_abs" randomize interval in
wrap_update timer
~f:(fun () ->
if timer.status <> Activated then
failwith "Timer.add_abs: timer deactivated";
let event =
{
time = time;
interval = interval;
handler = handler;
timer = timer;
t_event_opt = None;
}
in
let t_event = Heap.push timer.events event in
event.t_event_opt <- Some t_event;
event)
let add t handler ?randomize ?interval span =
let time = Time.add (Time.now ()) span in
check_span "add" span;
add_abs t handler ?randomize ?interval time
let remove { timer = timer; t_event_opt = t_event_opt } =
match t_event_opt with
| Some t_event ->
wrap_update timer ~f:(fun () ->
if timer.status <> Activated then
failwith "Timer.remove: timer deactivated";
if Heap.heap_el_is_valid t_event then Heap.remove t_event)
| None -> assert false (* impossible *)
let reschedule ({ timer = timer } as ev) ?randomize ?interval span =
match ev.t_event_opt with
| Some t_event ->
let loc = "reschedule" in
check_span loc span;
let interval = get_interval_param loc randomize interval in
wrap_update timer ~f:(fun () ->
if timer.status <> Activated then
failwith "Timer.reschedule: timer deactivated";
if Heap.heap_el_is_valid t_event then (
ev.time <- Time.add ev.time span;
ev.interval <- interval;
Heap.update t_event ev)
else failwith "Timer.reschedule: event not scheduled")
| None -> assert false (* impossible *)
let get_timer ev = ev.timer
let get_event_time ev = ev.time
let get_event_interval ev = ev.interval
let is_activated timer = timer.status = Activated
|