File: timer.ml

package info (click to toggle)
janest-core 107.01-5
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 2,440 kB
  • sloc: ml: 26,624; ansic: 2,498; sh: 49; makefile: 29
file content (236 lines) | stat: -rw-r--r-- 8,628 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
(******************************************************************************
 *                             Core                                           *
 *                                                                            *
 * Copyright (C) 2008- Jane Street Holding, LLC                               *
 *    Contact: opensource@janestreet.com                                      *
 *    WWW: http://www.janestreet.com/ocaml                                    *
 *                                                                            *
 *                                                                            *
 * This library is free software; you can redistribute it and/or              *
 * modify it under the terms of the GNU Lesser General Public                 *
 * License as published by the Free Software Foundation; either               *
 * version 2 of the License, or (at your option) any later version.           *
 *                                                                            *
 * This library is distributed in the hope that it will be useful,            *
 * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU          *
 * Lesser General Public License for more details.                            *
 *                                                                            *
 * You should have received a copy of the GNU Lesser General Public           *
 * License along with this library; if not, write to the Free Software        *
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA  *
 *                                                                            *
 ******************************************************************************)

open Std_internal

module Mutex = Mutex0

type interval =
  | INone
  | INormal of Span.t
  | IRandom of Span.t * float

type status = Activated | Deactivating | Deactivated

(* Mutex [mtx] must be held when modifying [status] and [events].  Such
   modifications require that the timer thread and other threads potentially
   waiting on the condition variable be woken up.  Hence the condition
   variable always needs to be broadcast thereafter.  [wrap_update] makes
   this safe and easy.  The timer thread may also wake up while waiting on
   the condition variable when the timer expires. *)
type t =
  {
    mutable status : status;
    events : event Heap.t;
    mtx : Mutex.t;
    cnd : Condition.t;
    mutable now : Time.t;
  }

and event =
  {
    mutable time : Time.t;
    mutable interval : interval;
    handler : event -> Time.t -> unit;
    timer : t;
    mutable t_event_opt : event Heap.heap_el option;
  }

let run_timer timer =
  let mtx = timer.mtx in
  let rec handle_events () =
    (* Assumes that mutex is held *)
    match timer.status with
    | Deactivating ->
        timer.status <- Deactivated;
        Condition.broadcast timer.cnd;
        Mutex.unlock mtx
    | Deactivated -> assert false  (* impossible *)
    | Activated ->
      match Heap.top_heap_el timer.events with
      | None ->
          Condition.wait timer.cnd mtx;
          timer.now <- Time.now ();
          handle_events ()
      | Some event ->
          let ev = Heap.heap_el_get_el event in
          let sched_time = ev.time in
          let now = timer.now in
          if Time.(<) now sched_time then begin
            (* Event in the future, wait until then or until signal *)
            ignore (Core_condition.timedwait timer.cnd mtx sched_time : bool);
            timer.now <- Time.now ()
          end else begin
            (* Update event on the heap as necessary *)
            begin match ev.interval with
            | INone -> Heap.remove event
            | INormal span ->
                ev.time <- Time.add now span;
                Heap.update event ev
            | IRandom (span, max_ratio) ->
                let p2 = Random.float 2.0 in
                let p = p2 -. 1. in
                let confusion = Span.scale span (max_ratio *. p) in
                ev.time <- Time.add (Time.add now span) confusion;
                Heap.update event ev
            end;
            Mutex.unlock mtx;
            begin
              try ev.handler ev now
              with e ->
               eprintf "Timer.run: Exception in event handler: %s\n%!"
                 (Exn.to_string e)
            end;
            Mutex.lock mtx
          end;
          handle_events ()
  in
  Mutex.lock mtx;
  handle_events ()

let create ?(min_size = 1000) () =
  let events =
    Heap.create ~min_size (fun ev1 ev2 -> Time.compare ev1.time ev2.time)
  in
  let timer =
    {
      status = Activated;
      events = events;
      mtx = Mutex.create ();
      cnd = Condition.create ();
      now = Time.now ();
    }
  in
  ignore (Thread.create run_timer timer);
  timer

let size timer =
  Mutex.critical_section timer.mtx ~f:(fun () -> Heap.length timer.events)

let deactivate timer =
  Mutex.critical_section timer.mtx ~f:(fun () ->
    let rec wait () = Condition.wait timer.cnd timer.mtx; check ()
    and check () =
      match timer.status with
      | Activated ->
          timer.status <- Deactivating;
          Condition.broadcast timer.cnd;
          wait ()
      | Deactivating -> wait ()
      | Deactivated -> ()
    in
    check ())

let check_span loc span =
  if Span.(<) span Span.zero then
    invalid_arg (sprintf "Timer.%s: span < 0" loc)

let get_interval_param loc randomize = function
  | None -> INone
  | Some span ->
      check_span loc span;
      match randomize with
      | None -> INormal span
      | Some max_ratio ->
          if max_ratio < 0. || 1. < max_ratio then
            invalid_arg (
              sprintf "Timer.%s: max_ratio not in range [0.0, 1.0]" loc);
          IRandom (span, max_ratio)

(* Makes sure that the timer thread gets signaled only if the element
   at the top of the heap requires earlier wakeups *)
let wrap_update timer ~f =
  Mutex.critical_section timer.mtx ~f:(fun () ->
    let top_before_f = Heap.top timer.events in
    let res = f () in
    let top_after_f = Heap.top timer.events in
    match top_after_f with
    | None -> res (* Nothing on the queue, so no wake-up *)
    | Some top_after_f ->
        match top_before_f with
        | None ->
            (* Heap was empty and is now not, so wake up immediately *)
            Condition.broadcast timer.cnd;
            res
        | Some top_before_f ->
            if Time.(<=) top_before_f.time top_after_f.time then res
            else (
              (* Earlier event time at top: we have to wake up *)
              Condition.broadcast timer.cnd;
              res))


let add_abs timer handler ?randomize ?interval time =
  let interval = get_interval_param "add_abs" randomize interval in
  wrap_update timer
    ~f:(fun () ->
       if timer.status <> Activated then
         failwith "Timer.add_abs: timer deactivated";
       let event =
         {
           time = time;
           interval = interval;
           handler = handler;
           timer = timer;
           t_event_opt = None;
         }
       in
       let t_event = Heap.push timer.events event in
       event.t_event_opt <- Some t_event;
       event)

let add t handler ?randomize ?interval span =
  let time = Time.add (Time.now ()) span in
  check_span "add" span;
  add_abs t handler ?randomize ?interval time

let remove { timer = timer; t_event_opt = t_event_opt } =
  match t_event_opt with
  | Some t_event ->
      wrap_update timer ~f:(fun () ->
        if timer.status <> Activated then
          failwith "Timer.remove: timer deactivated";
        if Heap.heap_el_is_valid t_event then Heap.remove t_event)
  | None -> assert false  (* impossible *)

let reschedule ({ timer = timer } as ev) ?randomize ?interval span =
  match ev.t_event_opt with
  | Some t_event ->
      let loc = "reschedule" in
      check_span loc span;
      let interval = get_interval_param loc randomize interval in
      wrap_update timer ~f:(fun () ->
        if timer.status <> Activated then
          failwith "Timer.reschedule: timer deactivated";
        if Heap.heap_el_is_valid t_event then (
          ev.time <- Time.add ev.time span;
          ev.interval <- interval;
          Heap.update t_event ev)
        else failwith "Timer.reschedule: event not scheduled")
  | None -> assert false  (* impossible *)

let get_timer ev = ev.timer
let get_event_time ev = ev.time
let get_event_interval ev = ev.interval
let is_activated timer = timer.status = Activated