File: timeout.sml

package info (click to toggle)
mlton 20100608-2
  • links: PTS
  • area: main
  • in suites: squeeze
  • size: 34,980 kB
  • ctags: 69,089
  • sloc: ansic: 18,421; lisp: 2,879; makefile: 1,570; sh: 1,325; pascal: 256; asm: 97
file content (162 lines) | stat: -rw-r--r-- 6,148 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
(* timeout.sml
 * 2004 Matthew Fluet (mfluet@acm.org)
 *  Ported to MLton threads.
 *)

(* timeout.sml
 *
 * COPYRIGHT (c) 1995 AT&T Bell Laboratories.
 * COPYRIGHT (c) 1989-1991 John H. Reppy
 *
 * Events for synchronizing on timeouts.
 *)

structure TimeOut : TIME_OUT_EXTRA =
   struct
      structure Assert = LocalAssert(val assert = false)
      structure Debug = LocalDebug(val debug = false)

      structure S = Scheduler
      structure E = Event
      fun debug msg = Debug.sayDebug ([S.atomicMsg, S.tidMsg], msg)
      fun debug' msg = debug (fn () => msg)

      datatype trans_id = datatype TransID.trans_id
      datatype trans_id_state = datatype TransID.trans_id_state


      (* this holds an approximation of the current time of day.  It is
       * cleared at each pre-emption, and initialized on demand (by getTime).
       *)
      val clock = ref NONE

      (* returns an approximation of the current time of day
       * (this is at least as accurate as the time quantum).
       *)
      fun getTime () = 
         case !clock of
            NONE => let val t = Time.now()
                    in clock := SOME t;  t
                    end
          | SOME t => t
      fun preemptTime () = clock := NONE

      (* The queue of threads waiting for timeouts.
       * It is sorted in increasing order of time value.
       *)
      structure TQ = FunPriorityQueue(structure Key = struct open Time type t = time end)
      type item = trans_id * (unit -> unit) * S.rdy_thread
      val timeQ : item TQ.t ref = ref (TQ.new ())

      fun cleaner (readied: unit -> unit) elt =
         let 
            val now = getTime ()
            val (TXID txst, cleanUp: unit -> unit, t) = TQ.Elt.value elt
         in 
            case !txst of 
               CANCEL => true 
             | _ => if Time.<=(TQ.Elt.key elt, now)
                       then (readied ()
                             ; S.ready t
                             ; cleanUp ()
                             ; true)
                       else false
         end

      fun timeWait (time, txid, cleanUp, t) = 
         (Assert.assertAtomic' ("TimeOut.timeWait", NONE)
          ; timeQ := TQ.enqueAndClean(!timeQ, time, (txid, cleanUp, t), cleaner (fn () => ())))

      (** NOTE: unlike for most base events, the block functions of time-out
       ** events do not have to exit the atomic region or execute the clean-up
       ** operation.  This is done when they are removed from the waiting queue.
       **)
      fun timeOutEvt time = 
         let
            fun blockFn {transId, cleanUp, next} = 
               let
                  val () = Assert.assertAtomic' ("TimeOut.timeOutEvt.blockFn", NONE)
                  val () = debug' "timeOutEvt(3.2.1)" (* Atomic 1 *)
                  val () = Assert.assertAtomic' ("TimeOut.timeOutEvt(3.2.1)", SOME 1)
                  val () =
                     S.atomicSwitch
                     (fn t =>
                      (timeWait (Time.+(time, getTime ()), transId, cleanUp, S.prep t)
                       ; next ()))
                  val () = debug' "timeOutEvt(3.2.3)" (* NonAtomic *)
                  val () = Assert.assertNonAtomic' "TimeOut.timeOutEvt(3.2.3)"
               in
                  ()
               end
            fun pollFn () = 
               let
                  val () = Assert.assertAtomic' ("TimeOut.timeOutEvt.pollFn", NONE)
                  val () = debug' "timeOutEvt(2)" (* Atomic 1 *)
                  val () = Assert.assertAtomic' ("TimeOut.timeOutEvt(2)", SOME 1)
               in
                  if Time.<=(time, Time.zeroTime)
                     then E.enabled {prio = ~1, doitFn = S.atomicEnd}
                     else E.blocked blockFn
               end
         in
            E.bevt pollFn
         end

      fun atTimeEvt time = 
         let
            fun blockFn {transId, cleanUp, next} = 
               let
                  val () = Assert.assertAtomic' ("TimeOut.atTimeEvt.blockFn", NONE)
                  val () = debug' "atTimeEvt(3.2.1)" (* Atomic 1 *)
                  val () = Assert.assertAtomic' ("TimeOut.atTimeEvt(3.2.1)", SOME 1)
                  val () =
                     S.atomicSwitch
                     (fn t =>
                      (timeWait (time, transId, cleanUp, S.prep t)
                       ; next ()))
                  val () = debug' "atTimeEvt(3.2.3)" (* NonAtomic *)
                  val () = Assert.assertNonAtomic' "TimeOut.atTimeEvt(3.2.3)"
               in
                  ()
               end
            fun pollFn () = 
               let
                  val () = Assert.assertAtomic' ("TimeOut.atTimeEvt.pollFn", NONE)
                  val () = debug' "atTimeEvt(2)" (* Atomic 1 *)
                  val () = Assert.assertAtomic' ("TimeOut.atTimeEvt(2)", SOME 1)
               in
                  if Time.<=(time, getTime())
                     then E.enabled {prio = ~1, doitFn = S.atomicEnd}
                     else E.blocked blockFn
               end
         in
            E.bevt pollFn
         end

      (* reset various pieces of state *)
      fun reset () = timeQ := TQ.new ()

      (* what to do at a preemption *)
      fun preempt () : Time.time option option = 
         let 
            val () = Assert.assertAtomic' ("TimeOut.preempt", NONE)
            val () = debug' "TimeOut.preempt" (* Atomic 1 *)
            val () = Assert.assertAtomic' ("TimeOut.preempt", SOME 1)
            val () = preemptTime ()
            val timeQ' = !timeQ
         in
            if TQ.empty timeQ'
               then NONE
               else let
                       val readied = ref false
                       val timeQ' = TQ.clean (timeQ', cleaner (fn () => readied := true))
                       val () = timeQ := timeQ'
                    in
                       if !readied
                          then SOME NONE
                          else case TQ.peek timeQ' of
                                  NONE => NONE
                                | SOME elt => SOME(SOME(Time.-(TQ.Elt.key elt, getTime ())))
                    end
         end
   end