File: Thread.sml

package info (click to toggle)
polyml 5.2.1-1.1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd, wheezy
  • size: 19,692 kB
  • ctags: 17,567
  • sloc: cpp: 37,221; sh: 9,591; asm: 4,120; ansic: 428; makefile: 203; ml: 191; awk: 91; sed: 10
file content (634 lines) | stat: -rw-r--r-- 29,209 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
(*
    Title:      Thread package for ML.
    Author:     David C. J. Matthews
    Copyright (c) 2007

    This library is free software; you can redistribute it and/or
    modify it under the terms of the GNU Lesser General Public
    License as published by the Free Software Foundation; either
    version 2.1 of the License, or (at your option) any later version.
    
    This library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    Lesser General Public License for more details.
    
    You should have received a copy of the GNU Lesser General Public
    License along with this library; if not, write to the Free Software
    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
*)

(* This signature and structure are not part of the standard basis library
   but are included here because they depend on the Time structure and are
   in turn dependencies of the BasicIO structure. *)

signature THREAD =
sig
    exception Thread of string (* Raised if an operation fails. *)
    
    structure Thread:
    sig
        type thread;
        
        (* Thread attributes - This may be extended. *)
        datatype threadAttribute =
            (* Does this thread accept a broadcast interrupt?  The default is not to
               accept broadcast interrupts. *)
            EnableBroadcastInterrupt of bool
            (* How to handle interrupts.  The default is to handle interrupts synchronously.  *)
        |   InterruptState of interruptState
        
        and interruptState =
            InterruptDefer (* Defer any interrupts. *)
        |   InterruptSynch  (* Interrupts are delivered synchronously. An interrupt
               will be delayed until an interruption point.  An interruption point
               is one of: testInterrupt, ConditionVar.wait, ConditionVar.waitUntil
               and various library calls that may block, such as IO calls, pause etc.
               N.B.  Mutex.lock is not an interruption point even though it can result
               in a thread blocking for an indefinite period.  *)
        |   InterruptAsynch (* Interrupts are delivered asynchronously i.e. at a suitable
               point soon after they are triggered. *)
        |   InterruptAsynchOnce (* As InterruptAsynch except that only a single interrupt
               is delivered asynchronously after which the interrupt state is changed to
               InterruptSynch.  It allows a thread to tidy up and if necessary indicate
               that it has been interrupted without the risk of a second asynchronous
               interrupt occurring in the handler for the first interrupt. *)
        
        (* fork: Fork a thread.  Starts a new thread running the function argument.  The
           attribute list gives initial values for thread attributes which can be
           modified by the thread itself.  Any unspecified attributes take default values.
           The thread is terminated when the thread function returns, if it
           raises an uncaught exception or if it calls "exit". *)
        val fork: (unit->unit) * threadAttribute list -> thread
        (* exit: Terminate this thread. *)
        val exit: unit -> unit
        (* isActive: Test if a thread is still running or has terminated. *)
        val isActive: thread -> bool
        
        (* Test whether thread ids are the same. *)
        val equal: thread * thread -> bool
        (* Get my own ID. *)
        val self: unit -> thread
        
        exception Interrupt (* = SML90.Interrupt *)
        (* Send an Interrupt exception to a specific thread.  When and indeed whether
           the exception is actually delivered will depend on the interrupt state
           of the target thread.  Raises Thread if the thread is no longer running,
           so an exception handler should be used unless the thread is known to
           be blocked. *)
        val interrupt: thread -> unit
        (* Send an interrupt exception to every thread which is set to accept it. *)
        val broadcastInterrupt: unit -> unit
        (* If this thread is handling interrupts synchronously, test to see 
           if it has been interrupted.  If so it raises the Interrupt
           exception. *)
        val testInterrupt: unit -> unit
        (* Terminate a thread. This should be used as a last resort.  Normally
           a thread should be allowed to clean up and terminate by using the
           interrupt call.  Raises Thread if the thread is no longer running,
           so an exception handler should be used unless the thread is known to
           be blocked. *)
        val kill: thread -> unit
        
        (* Get and set thread-local store for the calling thread. The store is a
           tagged associative memory which is initially empty for a new thread.
           A thread can call setLocal to add or replace items in its store and
           call getLocal to return values if they exist.  The Universal structure
           contains functions to make new tags as well as injection, projection and
           test functions. *)
        val getLocal: 'a Universal.tag -> 'a option
        val setLocal: 'a Universal.tag * 'a -> unit
        
        (* Change the specified attribute(s) for the calling thread.  Unspecified
           attributes remain unchanged. *)
        val setAttributes: threadAttribute list -> unit
        (* Get the values of attributes. *)
        val getAttributes: unit -> threadAttribute list

        (* Return the number of processors that will be used to run threads. *)
        val numProcessors: unit -> int
    end
        
    structure Mutex:
    sig
        (* Mutexes.  A mutex provides simple mutual exclusion.  A thread can lock
           a mutex and until it unlocks it no other thread will be able to lock it.
           Locking and unlocking are intended to be fast in the situation when
           there is no other process attempting to lock the mutex.  *)
        type mutex
        (* mutex: Make a new mutex *)
        val mutex: unit -> mutex
        (* lock:  Lock a mutex.  If the mutex is currently locked the thread is
           blocked until it is unlocked.  If a thread tries to lock a mutex that
           it has previously locked the thread will deadlock.
           N.B.  "lock" is not an interruption point (a point where synchronous
           interrupts are delivered) even though a thread can be blocked indefinitely. *)
        val lock: mutex -> unit
        (* unlock:  Unlock a mutex and allow any waiting threads to run.  The behaviour
           if the mutex was not previously locked by the calling thread is undefined.  *)
        val unlock: mutex -> unit
        (* trylock: Attempt to lock the mutex.  Returns true if the mutex was not
           previously locked and has now been locked by the calling thread.  Returns
           false if the mutex was previously locked, including by the calling thread. *)
        val trylock: mutex -> bool
        
        (* These functions may not work correctly if an asynchronous interrupt
           is delivered during the calls.  A thread should use synchronous interrupt
           when using these calls. *)
    end
    
    structure ConditionVar:
    sig
        (* Condition variables.  Condition variables are used to provide communication
           between threads.  A condition variable is used in conjunction with a mutex
           and usually a reference to establish and test changes in state.  The normal
           use is for one thread to lock a mutex, test the reference and then wait on
           the condition variable, releasing the lock on the mutex while it does so.
           Another thread may then lock the mutex, update the reference, unlock the
           mutex, and signal the condition variable.  This wakes up the first thread
           and reacquires the lock allowing the thread to test the updated reference
           with the lock held.
           More complex communication mechanisms, such as blocking channels, can
           be written in terms of condition variables. *)
        type conditionVar
        (* conditionVar: Make a new condition variable. *)
        val conditionVar: unit -> conditionVar
        (* wait: Release the mutex and block until the condition variable is
           signalled.  When wait returns the mutex has been re-acquired.
           If thread is deferring interrupts a call to "wait" may cause an Interrupt
           exception to be delivered.
           (The implementation must ensure that if an Interrupt is delivered as well
           as signal waking up a single thread that the interrupted thread does not
           consume the "signal".)
           The mutex is (re)acquired before Interrupt is delivered.  *)
        val wait: conditionVar * Mutex.mutex -> unit
        (* waitUntil: As wait except that it blocks until either the condition
           variable is signalled or the time (absolute) is reached.  Either way
           the mutex is reacquired so there may be a further delay if it is held
           by another thread.  *)
        val waitUntil: conditionVar * Mutex.mutex * Time.time -> bool
        (* signal: Wake up one thread if any are waiting on the condition variable. *)
        val signal: conditionVar -> unit
        (* broadcast: Wake up all threads waiting on the condition variable. *)
        val broadcast: conditionVar -> unit
    end

end;

structure Thread :> THREAD =
struct
    open RuntimeCalls (* for POLY_SYS and EXC numbers *)
    
    local
        structure ThreadEx =
            RunCall.Run_exception1(type ex_type = string; val ex_iden  = EXC_thread)
    in
        exception Thread = ThreadEx.ex
    end
    
    local
        (* Create non-overwritable mutables for mutexes and condition variables.
           A non-overwritable mutable in the executable or a saved state is not
           overwritten when a saved state further down the hierarchy is loaded. *)
		val F_non_overwrite_mutable : word = 0wx48;
		val System_alloc: word*word*word->word  =
			RunCall.run_call3 POLY_SYS_alloc_store
    in
        fun nvref (a: 'a) : 'a ref =
            RunCall.unsafeCast(System_alloc(0w1, 0wx48, RunCall.unsafeCast a))
    end
    
    structure Thread =
    struct
        
        datatype threadAttribute =
            EnableBroadcastInterrupt of bool
        |   InterruptState of interruptState
        
        and interruptState =
            InterruptDefer
        |   InterruptSynch
        |   InterruptAsynch
        |   InterruptAsynchOnce 

        (* Convert attributes to bits and a mask. *)
        fun attrsToWord (at: threadAttribute list): Word.word * Word.word =
        let
            (* Check that a particular attribute appears only once.
               As well as accumulating the actual bits in the result we
               also accumulate the mask of bits.  If any of these
               reappear we raise an exception. *)
            fun checkRepeat(r, acc, set, mask) =
                if Word.andb(set, mask) <> 0w0
                then raise Thread "The same attribute appears more than once in the list"
                else convert(r, acc,  Word.orb(set, mask))

            and convert([], acc, set) = (acc, set)
              | convert(EnableBroadcastInterrupt true :: r, acc, set) =
                    checkRepeat(r, Word.orb(acc, 0w1), set, 0w1)
              | convert(EnableBroadcastInterrupt false :: r, acc, set) =
                    checkRepeat(r, acc (* No bit *), set, 0w1)
              | convert(InterruptState s :: r, acc, set) =
                    checkRepeat(r, Word.orb(setIstateBits s, acc), set, 0w6)
        in
            convert(at, 0w0, 0w0)
        end
        
        and setIstateBits InterruptDefer = 0w0
          | setIstateBits InterruptSynch = 0w2
          | setIstateBits InterruptAsynch = 0w4
          | setIstateBits InterruptAsynchOnce = 0w6

        fun getIstateBits(w: Word.word): interruptState =
        let
            val ibits = Word.andb(w, 0w6)
        in
            if ibits = 0w0
            then InterruptDefer
            else if ibits = 0w2
            then InterruptSynch
            else if ibits = 0w4
            then InterruptAsynch
            else InterruptAsynchOnce
        end

        fun wordToAttrs w =
        let
            (* Enable broadcast - true if bottom bit is set. *)
            val bcast = EnableBroadcastInterrupt(Word.andb(w, 0w1) = 0w1)
        in
            [bcast, InterruptState(getIstateBits w)]
        end
        
        local
            structure Interrupt =
                RunCall.Run_exception0( val ex_iden  = RuntimeCalls.EXC_interrupt )
        in
            exception Interrupt = Interrupt.ex
        end

        (* The thread id is opaque outside this structure but is actually a three
           word mutable object.
           Word 0: Index into thread table (used inside the RTS only)
           Word 1: Flags: initialised by the RTS and set by this code
           Word 2: Thread local store: read and set by this code. *)
        type thread = Word.word ref (* Actually this is a four word mutable object. *)
        (* Equality is pointer equality. *)
        val equal : thread*thread->bool = RunCall.run_call2 POLY_SYS_word_eq
        (* Return our own thread object. *)
        val self: unit->thread = RunCall.run_call0 POLY_SYS_thread_self
        
        fun getLocal (t: 'a Universal.tag) : 'a option =
        let
            val root: Universal.universal ref list =
                RunCall.run_call2 POLY_SYS_load_word(self(), 2)

            fun doFind [] = NONE
              | doFind ((ref v)::r) =
                    if Universal.tagIs t v
                    then SOME(Universal.tagProject t v)
                    else doFind r
        in
            doFind root
        end
        
        fun setLocal (t: 'a Universal.tag, newVal: 'a) : unit =
        let
            (* See if we already have this in the list. *)
            val root: Universal.universal ref list =
                RunCall.run_call2 POLY_SYS_load_word(self(), 2)

            fun doFind [] =
                    (* Not in the list - Add it. *)
                    RunCall.run_call3 POLY_SYS_assign_word
                        (self(), 2, ref (Universal.tagInject t newVal) :: root)
              | doFind (v::r) =
                    if Universal.tagIs t (!v)
                        (* If it's in the list update it. *)
                    then v := Universal.tagInject t newVal
                    else doFind r

        in
            doFind root
        end
        
        local
            val doCall: int*unit->unit = RunCall.run_call2 POLY_SYS_thread_dispatch
        in
            fun testInterrupt() =
                (* If there is a pending request the word in the thread object
                   will be non-zero. *)
                if RunCall.run_call2 POLY_SYS_load_word(self(), 3) <> 0
                then doCall(11, ())
                else ()
        end

        local
            fun getAttrWord () : Word.word =
                RunCall.run_call2 POLY_SYS_load_word(self(), 1)
        in
            (* Set attributes.  Only changes the values that are specified.  The
               others remain the same. *)
            fun setAttributes (attrs: threadAttribute list) : unit =
            let
                val oldValues: Word.word = getAttrWord ()
                val (newValue, mask) = attrsToWord attrs
            in
                RunCall.run_call3 POLY_SYS_assign_word (self(), 1,
                    Word.orb(newValue, Word.andb(Word.notb mask, oldValues)));
                (* If we are now handling interrupts asynchronously check whether
                   we have a pending interrupt now.  This will only be effective
                   if we were previously handling them synchronously or *)
                if Word.andb(newValue, 0w4) = 0w4
                then testInterrupt()
                else ()
            end
                
            fun getAttributes() : threadAttribute list = wordToAttrs(getAttrWord())

            (* These are used in the ConditionVar structure.  They affect only the
               interrupt handling bits. *)
            fun getInterruptState(): interruptState = getIstateBits(getAttrWord())
            and setInterruptState(s: interruptState): unit =
                RunCall.run_call3 POLY_SYS_assign_word (self(), 1,
                    Word.orb(setIstateBits s, Word.andb(Word.notb 0w6, getAttrWord ())))
                
        end

        val exit: unit -> unit = RunCall.run_call0 POLY_SYS_kill_self

        local
            (* The default for a new thread is to ignore broadcasts and handle explicit
               interrupts synchronously. *)
            val (defaultAttrs, _) =
                attrsToWord[EnableBroadcastInterrupt false, InterruptState InterruptSynch]
            val doCall = RunCall.run_call2 POLY_SYS_thread_dispatch
        in
            fun fork(f:unit->unit, attrs: threadAttribute list): thread =
            let
                (* Any attributes specified explicitly override the defaults. *)
                val (attrWord, mask) = attrsToWord attrs
                val attrValue = Word.orb(attrWord, Word.andb(Word.notb mask, defaultAttrs))
            in
                doCall(7, (f, attrValue))
            end
        end
        
        local
            val doCall: int*thread->bool = RunCall.run_call2 POLY_SYS_thread_dispatch
        in
            fun isActive(t: thread): bool = doCall(8, t)
        end
        
        local
            val doCall: int*unit->unit = RunCall.run_call2 POLY_SYS_thread_dispatch
        in
            fun broadcastInterrupt() = doCall(10, ())
        end

        local
            val doCall: int*thread->unit = RunCall.run_call2 POLY_SYS_thread_dispatch
        in
            fun kill(t: thread) = doCall(12, t)
            and interrupt(t: thread) = doCall(9, t)
        end

        local
            val doCall = RunCall.run_call2 POLY_SYS_thread_dispatch
        in
            fun numProcessors():int = doCall(13, 0)
        end
    end
    
    structure Mutex =
    struct
        type mutex = Word.word ref
        fun mutex() = nvref 0w1; (* Initially unlocked. *)
        val atomicIncr: Word.word ref -> Word.word = RunCall.run_call1 POLY_SYS_atomic_incr
        and atomicDecr: Word.word ref -> Word.word = RunCall.run_call1 POLY_SYS_atomic_decr

        val doCall: int * mutex -> unit = RunCall.run_call2 POLY_SYS_thread_dispatch

        (* A mutex is implemented as a Word.word ref.  It is initially set to 1 and locked
           by atomically decrementing it.  If it was previously unlocked the result will
           by zero but if it was already locked it will be some negative value.  When it
           is unlocked it is atomically incremented.  If there was no contention the result
           will again be 1 but if some other thread tried to lock it the result will be
           zero or negative.  In that case the unlocking thread needs to call in to the
           RTS to wake up the blocked thread.

           The cost of contention on the lock is very high.  To try to avoid this we
           first loop (spin) to see if we can get the lock without contention.  *)

        val spin_cycle = 20000
        fun spin (m: mutex, c: int) =
           if ! m = 0w1 then ()
           else if c = spin_cycle then ()
           else spin(m, c+1);

        fun lock (m: mutex): unit =
        let
            val () = spin(m, 0)
            val newValue = atomicDecr m
        in
            if newValue = 0w0
            then () (* We've acquired the lock. *)
            else (* It's locked.  We return when we have the lock. *)
            (
                doCall(1, m);
                lock m (* Try again. *)
            )
        end

        fun unlock (m: mutex): unit =
        let
            val newValue = atomicIncr m
        in
            if newValue = 0w1
            then () (* No contention. *)
            else
                (* Another thread has blocked and we have to release it.  We can safely
                   set the value to 1 here to release the lock.  If another thread
                   acquires it before we have woken up the other threads that's fine.
                   Equally, if another thread decremented the count and saw it was
                   still locked it will enter the RTS and try to acquire the lock
                   there. *)
            (
                m := 0w1;
                doCall(2, m)
            )
        end

        (* Try to lock the mutex.  If it was previously unlocked then lock it and
           return true otherwise return false.  Because we don't block here there is
           the possibility that the thread that has locked it could release the lock
           shortly afterwards.  The check for !m = 0w1 is an optimisation and nearly
           all the time it avoids the call to atomicDecr setting m to a negative value.
           There is a small chance that another thread could lock the mutex between the
           test for !m = 0w1 and the atomicDecr.  In that case the atomicDecr would
           return a negative value and the function that locked the mutex will have to
           call into the RTS to reset it when it is unlocked.  *)
        fun trylock (m: mutex): bool =
            if !m = 0w1 andalso atomicDecr m = 0w0
            then true (* We've acquired the lock. *)
            else false (* The lock was taken. *)
    end

    structure ConditionVar =
    struct
        open Thread

        (* A condition variable contains a lock and a list of suspended threads. *)
        type conditionVar = { lock: Mutex.mutex, threads: thread list ref }
        fun conditionVar(): conditionVar =
            { lock = Mutex.mutex(), threads = nvref nil }

        (* To avoid duplicating the code we use zero to represent an infinite wait.
           Since that's a valid time in the past we check that it isn't used in
           waitUntil before doing anything else. *)
        val infinity = Time.zeroTime;

        local
            val doCall = RunCall.run_call2 POLY_SYS_thread_dispatch
            fun Sleep(mt: Mutex.mutex * Time.time): unit = doCall(3, mt)
        in
            fun innerWait({lock, threads}: conditionVar, m: Mutex.mutex, t: Time.time) : bool =
            let
                val me = self() (* My thread id. *)
                
                fun waitAgain() =
                let
                    fun doFind [] = false | doFind(h::t) = equal(h, me) orelse doFind t
                    
                    fun removeThis [] = raise Fail "Thread missing in list"
                     |  removeThis (h::t) = if equal(h, me) then t else h :: removeThis t
                     
                    val () = Sleep(lock, t) (* Atomically release the lock and wait. *)

                    val () = Mutex.lock lock (* Get the lock again.  *)
                    
                    (* Are we still on the list?  If so we haven't been explicitly woken
                       up.  We've either timed out, been interrupted or simply returned
                       because the RTS needed to process some asynchronous results.  *)
                    val stillThere = doFind(!threads)
                in
                    if not stillThere
                    then (* We're done. *)
                    (
                        Mutex.unlock lock;
                        true
                    )
                    else if t <> infinity andalso Time.now() >= t
                    then (* We've timed out. *)
                    (
                        threads := removeThis(! threads);
                        Mutex.unlock lock;
                        false
                    )
                    else
                    (
                        (* See if we've been interrupted.  If so remove ourselves
                           and exit. *)
                        testInterrupt()
                            handle exn => (threads := removeThis(! threads); Mutex.unlock lock; raise exn);
                        (* Otherwise just keep waiting. *)
                        waitAgain()
                    )
                end  
            in
                Mutex.lock lock; (* Lock the internal mutex. *)
                Mutex.unlock m; (* Unlock the external mutex *)
                threads := me :: !threads; (* Add ourselves to the list. *)
                waitAgain() (* Wait and return the result when we're done. *)
            end

            fun doWait(c: conditionVar, m: Mutex.mutex, t: Time.time) : bool =
            let
                val originalIntstate = getInterruptState()
                (* Set this to handle interrupts synchronously unless we're already
                   ignoring them. *)
                val () =
                    if originalIntstate = InterruptDefer
                    then ()
                    else setInterruptState InterruptSynch;
                    
                (* Wait for the condition.  If it raises an exception we still
                   need to reacquire the lock unless we were handling interrupts
                   asynchronously. *)
                val result =
                    innerWait(c, m, t) handle exn =>
                        (
                            (* We had an exception.  If we were handling exceptions synchronously
                               we reacquire the lock.  If it was set to InterruptAsynchOnce this
                               counts as a single asynchronous exception and we restore the
                               state as InterruptSynch. *)
                            case originalIntstate of
                                InterruptDefer => (* Shouldn't happen?  *) Mutex.lock m
                            |   InterruptSynch => Mutex.lock m
                            |   InterruptAsynch => setInterruptState InterruptAsynch
                            |   InterruptAsynchOnce => setInterruptState InterruptSynch;

                            raise exn (* Reraise the exception*)
                        )
            in
                (* Restore the original interrupt state first. *)
                setInterruptState originalIntstate;
                (* Normal return.  Reacquire the lock before returning. *)
                Mutex.lock m;
                result
            end

            fun wait(c: conditionVar, m: Mutex.mutex) : unit =
                (doWait(c, m, infinity); ())
            and waitUntil(c: conditionVar, m: Mutex.mutex, t: Time.time) : bool =
                if t = infinity
                then false (* This has already happened. *)
                else doWait(c, m, t)
        end
        
        local
            val doCall = RunCall.run_call2 POLY_SYS_thread_dispatch
            (* This call wakes up the specified thread.  If the thread has already been
               interrupted and is not ignoring interrupts it returns false.  Otherwise
               it wakes up the thread and returns true.  We have to use this because
               we define that if a thread is interrupted before it is signalled then
               it raises Interrupt. *)
            fun doWake(t: thread): bool = doCall(4, t)
            
            (* Wake a single thread if we can (signal). *)
            fun wakeOne [] = []
            |   wakeOne (thread::rest) =
                    if doWake thread
                    then rest
                    else thread :: wakeOne rest
            (* Wake all threads (broadcast). *)
            fun wakeAll [] = [] (* Always returns the empty list. *)
            |   wakeAll (thread::rest) = (doWake thread; wakeAll rest)
            
            fun signalOrBroadcast({lock, threads}: conditionVar, wakeThreads) : unit =
            let
                val originalState = getInterruptState()
            in
                (* Set this to handle interrupts synchronously unless we're already
                   ignoring them.  We need to do this to avoid an asynchronous
                   interrupt which could leave the internal lock in an inconsistent state. *)
                if originalState = InterruptDefer
                then ()
                else setInterruptState InterruptSynch;
                (* Get the condition var lock. *)
                Mutex.lock lock;
                threads := wakeThreads(! threads);
                Mutex.unlock lock;
                setInterruptState originalState; (* Restore original state. *)
                (* Test if we were interrupted while we were handling
                   interrupts synchronously. *)
                if originalState = InterruptAsynch orelse originalState = InterruptAsynchOnce
                then testInterrupt()
                else ()
            end
        in
            fun signal cv = signalOrBroadcast(cv, wakeOne)
            and broadcast cv = signalOrBroadcast(cv, wakeAll)
        end
    end
end;