File: Thread.sml

package info (click to toggle)
polyml 5.7.1-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid
  • size: 40,616 kB
  • sloc: cpp: 44,142; ansic: 26,963; sh: 22,002; asm: 13,486; makefile: 602; exp: 525; python: 253; awk: 91
file content (725 lines) | stat: -rw-r--r-- 33,737 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
(*
    Title:      Thread package for ML.
    Author:     David C. J. Matthews
    Copyright (c) 2007-2014

    This library is free software; you can redistribute it and/or
    modify it under the terms of the GNU Lesser General Public
    License as published by the Free Software Foundation; either
    version 2.1 of the License, or (at your option) any later version.
    
    This library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    Lesser General Public License for more details.
    
    You should have received a copy of the GNU Lesser General Public
    License along with this library; if not, write to the Free Software
    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
*)

(* This signature and structure are not part of the standard basis library
   but are included here because they depend on the Time structure and are
   in turn dependencies of the BasicIO structure. *)

signature THREAD =
sig
    exception Thread of string (* Raised if an operation fails. *)
    
    structure Thread:
    sig
        eqtype thread
        
        (* Thread attributes - This may be extended. *)
        datatype threadAttribute =
            (* Does this thread accept a broadcast interrupt?  The default is not to
               accept broadcast interrupts. *)
            EnableBroadcastInterrupt of bool
            (* How to handle interrupts.  The default is to handle interrupts synchronously.  *)
        |   InterruptState of interruptState
            (* Maximum size of the ML stack in words. NONE means unlimited *)
        |   MaximumMLStack of int option
        
        and interruptState =
            InterruptDefer (* Defer any interrupts. *)
        |   InterruptSynch  (* Interrupts are delivered synchronously. An interrupt
               will be delayed until an interruption point.  An interruption point
               is one of: testInterrupt, ConditionVar.wait, ConditionVar.waitUntil
               and various library calls that may block, such as IO calls, pause etc.
               N.B.  Mutex.lock is not an interruption point even though it can result
               in a thread blocking for an indefinite period.  *)
        |   InterruptAsynch (* Interrupts are delivered asynchronously i.e. at a suitable
               point soon after they are triggered. *)
        |   InterruptAsynchOnce (* As InterruptAsynch except that only a single interrupt
               is delivered asynchronously after which the interrupt state is changed to
               InterruptSynch.  It allows a thread to tidy up and if necessary indicate
               that it has been interrupted without the risk of a second asynchronous
               interrupt occurring in the handler for the first interrupt. *)
        
        (* fork: Fork a thread.  Starts a new thread running the function argument.  The
           attribute list gives initial values for thread attributes which can be
           modified by the thread itself.  Any unspecified attributes take default values.
           The thread is terminated when the thread function returns, if it
           raises an uncaught exception or if it calls "exit". *)
        val fork: (unit->unit) * threadAttribute list -> thread
        (* exit: Terminate this thread. *)
        val exit: unit -> unit
        (* isActive: Test if a thread is still running or has terminated. *)
        val isActive: thread -> bool
        
        (* Test whether thread ids are the same.  No longer needed if this is an eqtype. *)
        val equal: thread * thread -> bool
        (* Get my own ID. *)
        val self: unit -> thread
        
        exception Interrupt (* = SML90.Interrupt *)
        (* Send an Interrupt exception to a specific thread.  When and indeed whether
           the exception is actually delivered will depend on the interrupt state
           of the target thread.  Raises Thread if the thread is no longer running,
           so an exception handler should be used unless the thread is known to
           be blocked. *)
        val interrupt: thread -> unit
        (* Send an interrupt exception to every thread which is set to accept it. *)
        val broadcastInterrupt: unit -> unit
        (* If this thread is handling interrupts synchronously, test to see 
           if it has been interrupted.  If so it raises the Interrupt
           exception. *)
        val testInterrupt: unit -> unit
        (* Terminate a thread. This should be used as a last resort.  Normally
           a thread should be allowed to clean up and terminate by using the
           interrupt call.  Raises Thread if the thread is no longer running,
           so an exception handler should be used unless the thread is known to
           be blocked. *)
        val kill: thread -> unit
        
        (* Get and set thread-local store for the calling thread. The store is a
           tagged associative memory which is initially empty for a new thread.
           A thread can call setLocal to add or replace items in its store and
           call getLocal to return values if they exist.  The Universal structure
           contains functions to make new tags as well as injection, projection and
           test functions. *)
        val getLocal: 'a Universal.tag -> 'a option
        val setLocal: 'a Universal.tag * 'a -> unit
        
        (* Change the specified attribute(s) for the calling thread.  Unspecified
           attributes remain unchanged. *)
        val setAttributes: threadAttribute list -> unit
        (* Get the values of attributes. *)
        val getAttributes: unit -> threadAttribute list

        (* Return the number of processors that will be used to run threads. *)
        val numProcessors: unit -> int
        (* and the number of physical processors if that is available. *)
        and numPhysicalProcessors: unit -> int option
    end
        
    structure Mutex:
    sig
        (* Mutexes.  A mutex provides simple mutual exclusion.  A thread can lock
           a mutex and until it unlocks it no other thread will be able to lock it.
           Locking and unlocking are intended to be fast in the situation when
           there is no other process attempting to lock the mutex.  *)
        type mutex
        (* mutex: Make a new mutex *)
        val mutex: unit -> mutex
        (* lock:  Lock a mutex.  If the mutex is currently locked the thread is
           blocked until it is unlocked.  If a thread tries to lock a mutex that
           it has previously locked the thread will deadlock.
           N.B.  "lock" is not an interruption point (a point where synchronous
           interrupts are delivered) even though a thread can be blocked indefinitely. *)
        val lock: mutex -> unit
        (* unlock:  Unlock a mutex and allow any waiting threads to run.  The behaviour
           if the mutex was not previously locked by the calling thread is undefined.  *)
        val unlock: mutex -> unit
        (* trylock: Attempt to lock the mutex.  Returns true if the mutex was not
           previously locked and has now been locked by the calling thread.  Returns
           false if the mutex was previously locked, including by the calling thread. *)
        val trylock: mutex -> bool
        
        (* These functions may not work correctly if an asynchronous interrupt
           is delivered during the calls.  A thread should use synchronous interrupt
           when using these calls. *)
    end
    
    structure ConditionVar:
    sig
        (* Condition variables.  Condition variables are used to provide communication
           between threads.  A condition variable is used in conjunction with a mutex
           and usually a reference to establish and test changes in state.  The normal
           use is for one thread to lock a mutex, test the reference and then wait on
           the condition variable, releasing the lock on the mutex while it does so.
           Another thread may then lock the mutex, update the reference, unlock the
           mutex, and signal the condition variable.  This wakes up the first thread
           and reacquires the lock allowing the thread to test the updated reference
           with the lock held.
           More complex communication mechanisms, such as blocking channels, can
           be written in terms of condition variables. *)
        type conditionVar
        (* conditionVar: Make a new condition variable. *)
        val conditionVar: unit -> conditionVar
        (* wait: Release the mutex and block until the condition variable is
           signalled.  When wait returns the mutex has been re-acquired.
           If thread is handling interrupts synchronously a call to "wait" may cause
           an Interrupt exception to be delivered.
           (The implementation must ensure that if an Interrupt is delivered as well
           as signal waking up a single thread that the interrupted thread does not
           consume the "signal".)
           The mutex is (re)acquired before Interrupt is delivered.  *)
        val wait: conditionVar * Mutex.mutex -> unit
        (* waitUntil: As wait except that it blocks until either the condition
           variable is signalled or the time (absolute) is reached.  Either way
           the mutex is reacquired so there may be a further delay if it is held
           by another thread.  *)
        val waitUntil: conditionVar * Mutex.mutex * Time.time -> bool
        (* signal: Wake up one thread if any are waiting on the condition variable. *)
        val signal: conditionVar -> unit
        (* broadcast: Wake up all threads waiting on the condition variable. *)
        val broadcast: conditionVar -> unit
    end

end;

structure Thread :> THREAD =
struct
    exception Thread = RunCall.Thread
    
    (* Create non-overwritable mutables for mutexes and condition variables.
       A non-overwritable mutable in the executable or a saved state is not
       overwritten when a saved state further down the hierarchy is loaded. *)
    val nvref = LibrarySupport.noOverwriteRef
    
    structure Thread =
    struct
        open Thread (* Created in INITIALISE with thread type and self function. *)

        (* Equality is pointer equality. *)
        val equal : thread*thread->bool = op =

        datatype threadAttribute =
            EnableBroadcastInterrupt of bool
        |   InterruptState of interruptState
        |   MaximumMLStack of int option
        
        and interruptState =
            InterruptDefer
        |   InterruptSynch
        |   InterruptAsynch
        |   InterruptAsynchOnce 

        (* Convert attributes to bits and a mask. *)
        fun attrsToWord (at: threadAttribute list): Word.word * Word.word =
        let
            (* Check that a particular attribute appears only once.
               As well as accumulating the actual bits in the result we
               also accumulate the mask of bits.  If any of these
               reappear we raise an exception. *)
            fun checkRepeat(r, acc, set, mask) =
                if Word.andb(set, mask) <> 0w0
                then raise Thread "The same attribute appears more than once in the list"
                else convert(r, acc,  Word.orb(set, mask))

            and convert([], acc, set) = (acc, set)
              | convert(EnableBroadcastInterrupt true :: r, acc, set) =
                    checkRepeat(r, Word.orb(acc, 0w1), set, 0w1)
              | convert(EnableBroadcastInterrupt false :: r, acc, set) =
                    checkRepeat(r, acc (* No bit *), set, 0w1)
              | convert(InterruptState s :: r, acc, set) =
                    checkRepeat(r, Word.orb(setIstateBits s, acc), set, 0w6)
              | convert(MaximumMLStack _ :: r, acc, set) =
                    convert(r, acc, set)
        in
            convert(at, 0w0, 0w0)
        end
        
        and setIstateBits InterruptDefer = 0w0
          | setIstateBits InterruptSynch = 0w2
          | setIstateBits InterruptAsynch = 0w4
          | setIstateBits InterruptAsynchOnce = 0w6

        fun getIstateBits(w: Word.word): interruptState =
        let
            val ibits = Word.andb(w, 0w6)
        in
            if ibits = 0w0
            then InterruptDefer
            else if ibits = 0w2
            then InterruptSynch
            else if ibits = 0w4
            then InterruptAsynch
            else InterruptAsynchOnce
        end

        fun wordToAttrs w =
        let
            (* Enable broadcast - true if bottom bit is set. *)
            val bcast = EnableBroadcastInterrupt(Word.andb(w, 0w1) = 0w1)
        in
            [bcast, InterruptState(getIstateBits w)]
        end
        
        exception Interrupt = RunCall.Interrupt

        (* The thread id is opaque outside this structure but is actually a six
           word mutable object.
           Word 0: Index into thread table (used inside the RTS only)
           Word 1: Flags: initialised by the RTS and set by this code
           Word 2: Thread local store: read and set by this code.
           Word 3: IntRequest: Set by the RTS if there is an interrupt pending
           Word 4: Maximum ML stack size.  Unlimited is stored here as zero
           *)
        val threadIdFlags       = 0w1
        and threadIdThreadLocal = 0w2
        and threadIdIntRequest  = 0w3
        and threadIdStackSize   = 0w4

        fun getLocal (t: 'a Universal.tag) : 'a option =
        let
            val root: Universal.universal ref list =
                RunCall.loadWord(self(), threadIdThreadLocal)

            fun doFind [] = NONE
              | doFind ((ref v)::r) =
                    if Universal.tagIs t v
                    then SOME(Universal.tagProject t v)
                    else doFind r
        in
            doFind root
        end
        
        fun setLocal (t: 'a Universal.tag, newVal: 'a) : unit =
        let
            (* See if we already have this in the list. *)
            val root: Universal.universal ref list =
                RunCall.loadWord(self(), threadIdThreadLocal)

            fun doFind [] =
                    (* Not in the list - Add it. *)
                    RunCall.storeWord
                        (self(), threadIdThreadLocal,
                         ref (Universal.tagInject t newVal) :: root)
              | doFind (v::r) =
                    if Universal.tagIs t (!v)
                        (* If it's in the list update it. *)
                    then v := Universal.tagInject t newVal
                    else doFind r

        in
            doFind root
        end
        
        local
            val threadTestInterrupt: unit -> unit = RunCall.rtsCallFull0 "PolyThreadTestInterrupt"
        in
            fun testInterrupt() =
                (* If there is a pending request the word in the thread object
                   will be non-zero. *)
                if RunCall.loadWord(self(), threadIdIntRequest) <> 0
                then threadTestInterrupt()
                else ()
        end

        local
            fun getAttrWord (me: thread) : Word.word =
                RunCall.loadWord(me, threadIdFlags)

            fun getStackSizeAsInt (me: thread) : int =
                RunCall.loadWord(me, threadIdStackSize)

            and getStackSize me : int option =
                case getStackSizeAsInt me of
                    0 => NONE
                |   s => SOME s

            fun newStackSize ([], default) = default
            |   newStackSize (MaximumMLStack NONE :: _, _) = 0
            |   newStackSize (MaximumMLStack (SOME n) :: _, _) =
                    if n <= 0 then raise Thread "The stack size must be greater than zero" else n
            |   newStackSize (_ :: l, default) = newStackSize (l, default)
            
            val threadMaxStackSize: int -> unit = RunCall.rtsCallFull1 "PolyThreadMaxStackSize"
        in
            (* Set attributes.  Only changes the values that are specified.  The
               others remain the same. *)
            fun setAttributes (attrs: threadAttribute list) : unit =
            let
                val me = self()
                val oldValues: Word.word = getAttrWord me
                val (newValue, mask) = attrsToWord attrs
                val stack = newStackSize(attrs, getStackSizeAsInt me)
            in
                RunCall.storeWord (self(), threadIdFlags,
                    Word.orb(newValue, Word.andb(Word.notb mask, oldValues)));
                if stack = getStackSizeAsInt me
                then () else threadMaxStackSize stack;
                (* If we are now handling interrupts asynchronously check whether
                   we have a pending interrupt now.  This will only be effective
                   if we were previously handling them synchronously or blocking
                   them. *)
                if Word.andb(newValue, 0w4) = 0w4
                then testInterrupt()
                else ()
            end
                
            fun getAttributes() : threadAttribute list =
            let
                val me = self()
            in
                MaximumMLStack (getStackSize me) :: wordToAttrs(getAttrWord me)
            end

            (* These are used in the ConditionVar structure.  They affect only the
               interrupt handling bits. *)
            fun getInterruptState(): interruptState = getIstateBits(getAttrWord(self()))
            and setInterruptState(s: interruptState): unit =
                RunCall.storeWord (self(), threadIdFlags,
                    Word.orb(setIstateBits s, Word.andb(Word.notb 0w6, getAttrWord(self()))))

            local
                (* The default for a new thread is to ignore broadcasts and handle explicit
                   interrupts synchronously. *)
                val (defaultAttrs, _) =
                    attrsToWord[EnableBroadcastInterrupt false, InterruptState InterruptSynch]
                val threadForkFunction:
                    (unit->unit) * word * int -> thread = RunCall.rtsCallFull3 "PolyThreadForkThread"
            in
                fun fork(f:unit->unit, attrs: threadAttribute list): thread =
                let
                    (* Any attributes specified explicitly override the defaults. *)
                    val (attrWord, mask) = attrsToWord attrs
                    val attrValue = Word.orb(attrWord, Word.andb(Word.notb mask, defaultAttrs))
                    val stack = newStackSize(attrs, 0 (* Default is unlimited *))
                in
                    threadForkFunction(f, attrValue, stack)
                end
            end
        end

        val exit: unit -> unit = RunCall.rtsCallFull0 "PolyThreadKillSelf"
        and isActive: thread -> bool = RunCall.rtsCallFast1 "PolyThreadIsActive"
        and broadcastInterrupt: unit -> unit = RunCall.rtsCallFull0 "PolyThreadBroadcastInterrupt"

        local
            (* Send an interrupt to a thread.  If it returns false
               the thread did not exist and this should raise an exception. *)
            val threadSendInterrupt: thread -> bool = RunCall.rtsCallFast1 "PolyThreadInterruptThread"
        in
            fun interrupt(t: thread) =
                if threadSendInterrupt t
                then ()
                else raise Thread "Thread does not exist"
        end

        local
            val threadKillThread: thread -> bool = RunCall.rtsCallFast1 "PolyThreadKillThread"
        in
            fun kill(t: thread) =
                if threadKillThread t
                then ()
                else raise Thread "Thread does not exist"
        end

        val numProcessors: unit -> int = RunCall.rtsCallFast0 "PolyThreadNumProcessors"

        local
            val numberOfPhysical: unit -> int =
                RunCall.rtsCallFast0 "PolyThreadNumPhysicalProcessors"
        in
            fun numPhysicalProcessors(): int option =
                (* It is not always possible to get this information *)
                case numberOfPhysical() of 0 => NONE | n => SOME n
        end
    end
    
    structure Mutex =
    struct
        type mutex = Word.word ref
        fun mutex() = nvref 0w1; (* Initially unlocked. *)
        open Thread  (* atomicIncr, atomicDecr and atomicReset are set up by Initialise. *)
        
        val threadMutexBlock: mutex -> unit = RunCall.rtsCallFull1 "PolyThreadMutexBlock"
        val threadMutexUnlock: mutex -> unit = RunCall.rtsCallFull1 "PolyThreadMutexUnlock"

        (* A mutex is implemented as a Word.word ref.  It is initially set to 1 and locked
           by atomically decrementing it.  If it was previously unlocked the result will
           by zero but if it was already locked it will be some negative value.  When it
           is unlocked it is atomically incremented.  If there was no contention the result
           will again be 1 but if some other thread tried to lock it the result will be
           zero or negative.  In that case the unlocking thread needs to call in to the
           RTS to wake up the blocked thread.

           The cost of contention on the lock is very high.  To try to avoid this we
           first loop (spin) to see if we can get the lock without contention.  *)

        val spin_cycle = 20000
        fun spin (m: mutex, c: int) =
           if ! m = 0w1 then ()
           else if c = spin_cycle then ()
           else spin(m, c+1);

        fun lock (m: mutex): unit =
        let
            val () = spin(m, 0)
            val newValue = atomicDecr m
        in
            if newValue = 0w0
            then () (* We've acquired the lock. *)
            else (* It's locked.  We return when we have the lock. *)
            (
                threadMutexBlock m;
                lock m (* Try again. *)
            )
        end

        fun unlock (m: mutex): unit =
        let
            val newValue = atomicIncr m
        in
            if newValue = 0w1
            then () (* No contention. *)
            else
                (* Another thread has blocked and we have to release it.  We can safely
                   set the value to 1 here to release the lock.  If another thread
                   acquires it before we have woken up the other threads that's fine.
                   Equally, if another thread decremented the count and saw it was
                   still locked it will enter the RTS and try to acquire the lock
                   there.
                   It's probably better to reset it here rather than within the RTS
                   since it allows another thread to acquire the lock immediately
                   rather than after the rather long process of entering the RTS.
                   Resetting this needs to be atomic with respect to atomic increment
                   and decrement.  That's not a problem on X86 so a simple assignment
                   is sufficient but in the interpreter at least it's necessary to
                   acquire a lock. *)
            (
                atomicReset m;
                threadMutexUnlock m
            )
        end

        (* Try to lock the mutex.  If it was previously unlocked then lock it and
           return true otherwise return false.  Because we don't block here there is
           the possibility that the thread that has locked it could release the lock
           shortly afterwards.  The check for !m = 0w1 is an optimisation and nearly
           all the time it avoids the call to atomicDecr setting m to a negative value.
           There is a small chance that another thread could lock the mutex between the
           test for !m = 0w1 and the atomicDecr.  In that case the atomicDecr would
           return a negative value and the function that locked the mutex will have to
           call into the RTS to reset it when it is unlocked.  *)
        fun trylock (m: mutex): bool =
            if !m = 0w1 andalso atomicDecr m = 0w0
            then true (* We've acquired the lock. *)
            else false (* The lock was taken. *)
    end

    structure ConditionVar =
    struct
        open Thread

        (* A condition variable contains a lock and a list of suspended threads. *)
        type conditionVar = { lock: Mutex.mutex, threads: thread list ref }
        fun conditionVar(): conditionVar =
            { lock = Mutex.mutex(), threads = nvref nil }

        local
            val threadCondVarWait: Mutex.mutex -> unit = RunCall.rtsCallFull1 "PolyThreadCondVarWait"
            and threadCondVarWaitUntil: Mutex.mutex * Time.time -> unit = RunCall.rtsCallFull2 "PolyThreadCondVarWaitUntil"
        in
            fun innerWait({lock, threads}: conditionVar, m: Mutex.mutex, t: Time.time option) : bool =
            let
                val me = self() (* My thread id. *)
                
                fun waitAgain() =
                let
                    fun doFind [] = false | doFind(h::t) = equal(h, me) orelse doFind t
                    
                    fun removeThis [] = raise Fail "Thread missing in list"
                     |  removeThis (h::t) = if equal(h, me) then t else h :: removeThis t
                     
                    val () =
                        case t of
                            SOME time => threadCondVarWaitUntil(lock, time)
                        |   NONE => threadCondVarWait lock

                    val () = Mutex.lock lock (* Get the lock again.  *)
                    
                    (* Are we still on the list?  If so we haven't been explicitly woken
                       up.  We've either timed out, been interrupted or simply returned
                       because the RTS needed to process some asynchronous results.  *)
                    val stillThere = doFind(!threads)
                    open Time (* For >= *)
                in
                    if not stillThere
                    then (* We're done. *)
                    (
                        Mutex.unlock lock;
                        true
                    )
                    else if (case t of NONE => false | SOME t => Time.now() >= t)
                    then (* We've timed out. *)
                    (
                        threads := removeThis(! threads);
                        Mutex.unlock lock;
                        false
                    )
                    else
                    (
                        (* See if we've been interrupted.  If so remove ourselves
                           and exit. *)
                        testInterrupt()
                            handle exn => (threads := removeThis(! threads); Mutex.unlock lock; raise exn);
                        (* Otherwise just keep waiting. *)
                        waitAgain()
                    )
                end  
            in
                Mutex.lock lock; (* Lock the internal mutex. *)
                Mutex.unlock m; (* Unlock the external mutex *)
                threads := me :: !threads; (* Add ourselves to the list. *)
                waitAgain() (* Wait and return the result when we're done. *)
            end

            fun doWait(c: conditionVar, m: Mutex.mutex, t: Time.time option) : bool =
            let
                val originalIntstate = getInterruptState()
                (* Set this to handle interrupts synchronously unless we're already
                   ignoring them. *)
                val () =
                    if originalIntstate = InterruptDefer
                    then ()
                    else setInterruptState InterruptSynch;
                    
                (* Wait for the condition.  If it raises an exception we still
                   need to reacquire the lock unless we were handling interrupts
                   asynchronously. *)
                val result =
                    innerWait(c, m, t) handle exn =>
                        (
                            (* We had an exception.  If we were handling exceptions synchronously
                               we reacquire the lock.  If it was set to InterruptAsynchOnce this
                               counts as a single asynchronous exception and we restore the
                               state as InterruptSynch. *)
                            case originalIntstate of
                                InterruptDefer => (* Shouldn't happen?  *) Mutex.lock m
                            |   InterruptSynch => Mutex.lock m
                            |   InterruptAsynch => setInterruptState InterruptAsynch
                            |   InterruptAsynchOnce => setInterruptState InterruptSynch;

                            raise exn (* Reraise the exception*)
                        )
            in
                (* Restore the original interrupt state first. *)
                setInterruptState originalIntstate;
                (* Normal return.  Reacquire the lock before returning. *)
                Mutex.lock m;
                result
            end

            fun wait(c: conditionVar, m: Mutex.mutex) : unit =
                (doWait(c, m, NONE); ())
            and waitUntil(c: conditionVar, m: Mutex.mutex, t: Time.time) : bool =
                doWait(c, m, SOME t)
        end
        
        local
            (* This call wakes up the specified thread.  If the thread has already been
               interrupted and is not ignoring interrupts it returns false.  Otherwise
               it wakes up the thread and returns true.  We have to use this because
               we define that if a thread is interrupted before it is signalled then
               it raises Interrupt. *)
            val threadCondVarWake: thread -> bool = RunCall.rtsCallFast1 "PolyThreadCondVarWake"
            
            (* Wake a single thread if we can (signal). *)
            fun wakeOne [] = []
            |   wakeOne (thread::rest) =
                    if threadCondVarWake thread
                    then rest
                    else thread :: wakeOne rest
            (* Wake all threads (broadcast). *)
            fun wakeAll [] = [] (* Always returns the empty list. *)
            |   wakeAll (thread::rest) = (threadCondVarWake thread; wakeAll rest)
            
            fun signalOrBroadcast({lock, threads}: conditionVar, wakeThreads) : unit =
            let
                val originalState = getInterruptState()
            in
                (* Set this to handle interrupts synchronously unless we're already
                   ignoring them.  We need to do this to avoid an asynchronous
                   interrupt which could leave the internal lock in an inconsistent state. *)
                if originalState = InterruptDefer
                then ()
                else setInterruptState InterruptSynch;
                (* Get the condition var lock. *)
                Mutex.lock lock;
                threads := wakeThreads(! threads);
                Mutex.unlock lock;
                setInterruptState originalState; (* Restore original state. *)
                (* Test if we were interrupted while we were handling
                   interrupts synchronously. *)
                if originalState = InterruptAsynch orelse originalState = InterruptAsynchOnce
                then testInterrupt()
                else ()
            end
        in
            fun signal cv = signalOrBroadcast(cv, wakeOne)
            and broadcast cv = signalOrBroadcast(cv, wakeAll)
        end
    end
end;

structure ThreadLib:
sig
    val protect: Thread.Mutex.mutex -> ('a -> 'b) -> 'a -> 'b
end =
struct
    (* This applies a function while a mutex is being held. 
       Although this can be defined in terms of Thread.Thread.getAttributes it's
       defined here using the underlying calls.  The original version with
       getAttributes appeared as a major allocation hot-spot when building the
       compiler because "protect" is called round every access to the global
       name-space. *)
    fun protect m f a =
    let
        open Thread.Thread Thread.Mutex
        open Word
        (* Set this to handle interrupts synchronously except if we are blocking
           them.  We don't want to get an asynchronous interrupt while we are
           actually locking or unlocking the mutex but if we have to block to do
           IO then we should allow an interrupt at that point. *)
        val oldAttrs: Word.word = RunCall.loadWord(self(), 0w1)
        val () =
            if andb(oldAttrs, 0w6) = 0w0 (* Already deferred? *)
            then ()
            else RunCall.storeWord (self(), 0w1,
                    orb(andb(notb 0w6, oldAttrs), 0w4))
        fun restoreAttrs() =
        (
            RunCall.storeWord (self(), 0w1, oldAttrs);
            if andb(oldAttrs, 0w4) = 0w4 then testInterrupt() else ()
        )
        val () = lock m
        val result = f a
            handle exn =>
            (
                unlock m; restoreAttrs();
                (* Reraise the exception preserving the location information. *)
                PolyML.Exception.reraise exn
            )
    in
        unlock m;
        restoreAttrs();
        result
    end
end;


local
    fun prettyMutex _ _ (_: Thread.Mutex.mutex) = PolyML.PrettyString "?"
    and prettyThread _ _ (_: Thread.Thread.thread) = PolyML.PrettyString "?"
    and prettyCondVar _ _ (_: Thread.ConditionVar.conditionVar) = PolyML.PrettyString "?"
in
    val () = PolyML.addPrettyPrinter prettyMutex
    and () = PolyML.addPrettyPrinter prettyThread
    and () = PolyML.addPrettyPrinter prettyCondVar
end;