File: processes.ML

package info (click to toggle)
polyml 5.7.1-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid
  • size: 40,616 kB
  • sloc: cpp: 44,142; ansic: 26,963; sh: 22,002; asm: 13,486; makefile: 602; exp: 525; python: 253; awk: 91
file content (392 lines) | stat: -rw-r--r-- 17,277 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
(*
    Title:      Process package for ML.
    Author:     David C. J. Matthews
    Copyright (c) 2007

    This library is free software; you can redistribute it and/or
    modify it under the terms of the GNU Lesser General Public
    License as published by the Free Software Foundation; either
    version 2.1 of the License, or (at your option) any later version.
    
    This library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
    Lesser General Public License for more details.
    
    You should have received a copy of the GNU Lesser General Public
    License along with this library; if not, write to the Free Software
    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
*)

(* This is provided for backwards compatibility.  New programs should use
   the Thread structure directly. *)

structure Process:
sig
   type 'a channel 
   val channel: unit -> '_a channel
   val send:    'a channel * 'a -> unit
   val receive: 'a channel -> 'a
   val fork:    (unit->unit)->unit
   val console: (unit->unit)->(unit->unit)
   val choice:  (unit->unit)*(unit->unit)->unit
   val interruptConsoleProcesses: unit->unit
end =
struct
    open Thread.Thread Thread.Mutex Thread.ConditionVar
    
    val debug = ref false and identifiers = ref 0 and ids = ref 0;

    (* Each process created by fork, console or choice has this information
       as thread-local data.  *)
    datatype processData = PROC of {
        synchro: (synchroniser * direction) list ref,   (* The synchroniser chain. *)
        blocker: conditionVar, (* Condition var to block this process. *)
        processNo: int          (* An identifier for debugging. *)
    }
    
    and synchroniser =
        SYNCH of {
        state: choicestate ref, (* The state of this choice. *)
        synchLock: mutex      (* A mutex to protect the state variable. *)
    }
    
    and choicestate = ChoiceUntaken | ChoiceTaken of direction
    
    and direction = DirLeft | DirRight;
    
    val procTag = Universal.tag(): processData Universal.tag

    (* Get the process data for this thread.  If it was created by a Thread call
       directly it may not yet have any process data so we need to make it now.  *)
    fun get_process_data(): processData =
        case getLocal procTag of
            SOME p => p
        |   NONE =>
            let
                val pnum = (identifiers := !identifiers+1; !identifiers);
                val pData = PROC { synchro = ref [], blocker = conditionVar(), processNo = pnum }
            in
                setLocal(procTag, pData);
                pData
            end

    datatype 'a channel =
        CHAN of {senders: 'a procVal list ref,
                 receivers: 'a option ref procVal list ref,
                 chanLock: mutex,
                 Id: int}

    (* This represents a suspended process.  The 'a is either a value to be
       sent or a "basket", a reference to hold the result. *)
    withtype 'a procVal = conditionVar * processData * 'a

    fun channel () : 'a channel =
        CHAN {senders = ref [], receivers = ref [],
              chanLock = mutex(), Id = (ids := !ids+1; !ids) }

    datatype 'a synchResult = NoMatch | FoundMatch of 'a procVal

    (* Prunes the synchroniser list to remove committed choices.
       Returns the first non-committed synchroniser or the first committed
       synchroniser with a choice that is taken in the "wrong" direction
       (i.e. which ndicates that this process must not be allowed to communicate). *)
    fun getActiveSynchroniser(PROC{synchro, ...}, unlockAfter) =
    let
        fun getSynch [] = []
        |   getSynch(l as (SYNCH{state, synchLock}, dir) :: t) =
            (
                lock synchLock;
                case ! state of
                    ChoiceUntaken => (* This is untaken.  Stop here. *)
                    (
                        if unlockAfter
                        then unlock synchLock
                        else (getSynch t; ()); (* We need to lock any others. *)
                        l
                    )
                |   ChoiceTaken d =>
                        (* This is taken.  Stop here if it is taken in a different way. *)
                    (
                        unlock synchLock;
                        if d = dir
                        then getSynch t
                        else l
                    )
            )
        val newSynchList = getSynch(! synchro)
    in
        (* We can update the list for this process.  We don't need to lock the
           synchro variable since it is only updated either by the process itself or
           when this process is waiting on a channel, which is locked before access. *)
        synchro := newSynchList;
        newSynchList
    end

    (* Try to find a matching process.  toSearch is the list of corresponding
       processes i.e. receivers if we are trying to send, senders if we are
       trying to receive.  The result is a pair of the updated version of the
       toSearch, with the matching process removed if a match has been found
       and the matching process's data. *)
    fun synchronise (toSearch: 'a procVal list, thisProcess) :
            'a procVal list * 'a synchResult =
    let     
        (* Release the lock on the synchroniser for the process that is looking for
           a partner.  This is only called if no matching process can be found.  *)
        fun releaseLock(PROC{synchro = ref synchro, ...}) =
            List.app
                (fn (SYNCH{synchLock, state=ref ChoiceUntaken}, _) => unlock synchLock
                  |  _ => ()) synchro

        (* Commit the choices and release the locks.  If some entries are shared
           between the two processes then we may find some entries already set. *)
        fun commitChoices(PROC{synchro=synchro as ref synch, ...}) =
        (
            List.app
                (fn (SYNCH{synchLock, state=state as ref ChoiceUntaken}, dir) =>
                     (state := ChoiceTaken dir; unlock synchLock)
                  |  _ => ()) synch;
            synchro := [] (* Since all are taken we can set this to the empty list. *)
        )

        (* Get the first synchroniser and lock it unless it is already committed. *)
        val mySynch = getActiveSynchroniser(thisProcess, false (* Leave locked. *))

        (* Get the list of synchronisers for a potential matching process.  Generally
           any process on the sender list will match a receiver and vice versa.  The
           exception is if the two processes are alternative choices.  We have to be
           careful with the synchroniser lists.  We've already locked the list for our
           process so we mustn't lock any synchronisers that are shared.  *)
        datatype matchResult =
            MrTaken | MrAlternatives | MrOK of (synchroniser * direction) list

        fun getMatchingSynchs(PROC{synchro, ...}) =
        let
            fun getSynch([], _) = MrOK []
            |   getSynch(l as (SYNCH{state, ...}, dir) :: t,
                         myL as (SYNCH{state=s, ...}, myDir) :: myT) =
                if s <> state 
                then (* Different references - safe to lock.  *)
                    lockSynch(l, myL)
                else (* Same reference - already locked. *)
                    if dir <> myDir (* These are different choices. *)
                then MrAlternatives (* Not allowed to communicate. *)
                else (* OK, same choice: test the rest*)
                    getSynch(t, myT)
            |   getSynch(l, []) =
                    (* The list of synchronisers for the original process is empty or
                       has run out before this.  *)
                    lockSynch(l, [])
    
            and lockSynch(l as (SYNCH{state, synchLock}, dir) :: t, myL) =
                (
                    lock synchLock;
                    case ! state of
                        ChoiceUntaken => (* This is untaken.  Stop here. *)
                        (
                            getSynch(t, myL); (* We need to lock any others. *)
                            MrOK l
                        )
                    |   ChoiceTaken d =>
                            (* This is taken.  Stop here if it is taken in a different way. *)
                        (
                            unlock synchLock;
                            if d = dir
                            then getSynch(t, myL)
                            else MrTaken
                        )
                )
            |   lockSynch _ = raise Match (* Suppress warning *)
        
        in
            getSynch(! synchro, mySynch) 
        end

        fun findAProcess [] = 
            (* Find a process that matches and return the new list of partners
              and the new list of runnable processes. *)
           (* No match *) ([], NoMatch)
        |   findAProcess((entry as (_,d,_)) :: t) =
            case getMatchingSynchs d of
                MrTaken =>
                    (* This process is a committed choice in a different direction.  Drop
                       it from the list since it can never communicate.  *)
                    findAProcess t
            |   MrAlternatives =>
                    (* This process is an alternative choice with our process.  It can
                       still communicate, just not with us.  Skip this and try the next. *)
                let
                    val (clist, result) = findAProcess t
                in
                    (entry :: clist, result)
                end
            |   MrOK _ =>
                    (t, FoundMatch entry) (* Return the new list. *)
        
    in
        case mySynch of
            (SYNCH{state = ref (ChoiceTaken _), ...}, _) :: _ =>
           (* This choice is already taken - kill this process.
              Actually all we do at this stage is pretend that the process
              cannot communicate, and suspend it.  Later it may be removed
              from the channel. *)
            (toSearch, NoMatch)
        |   _ => (* No synch or uncommitted choice. *)
            case findAProcess toSearch of
                t as (_, NoMatch) => (releaseLock thisProcess; t)
            |   t as (_, FoundMatch(_,p,_)) =>
                    (commitChoices thisProcess; commitChoices p; t)
    end

    (* We need to ensure that interrupts are delivered synchronously when
       synchronising rather than risk receiving an interrupt while holding a lock. *)
    fun blockInterrupt (f: unit->'a) =
    let
        open Thread
        val oldState = getAttributes()
    in
        case List.find (fn InterruptState _ => true | _ => false) oldState of
            SOME(InterruptState InterruptDefer) => f() (* Continue to defer. *)
        |   SOME(InterruptState InterruptSynch) => f() (* No need to change. *)
        |   _ => (* Unset(?) or asynchronous.  Have to make synchronous. *)
            let
                val () = setAttributes[InterruptState InterruptSynch]
                (* Call the function.  It may raise an Interrupt exception if it has to
                   wait.  In that case we still need to restore the old state. *)
                val result = 
                    f() handle exn => (setAttributes oldState; raise exn)
                val () = setAttributes oldState;
            in
                result
            end
    end

    fun send (CHAN {senders, receivers, chanLock, ...}, v:'a) =
        blockInterrupt(fn () =>
            let
                val () = lock chanLock;
                val myProcessData as PROC { blocker, ...} = get_process_data()
            in
                case synchronise(!receivers, myProcessData)
                of  (newlist, FoundMatch (p,_,basket)) (* Success *) =>
                    (
                        basket := SOME v; (* Put the sent value into the receiver's basket. *)
                        receivers := newlist;
                        signal p; (* Wake up the new thread. *)
                        unlock chanLock
                    )
                |   (newlist, NoMatch) (* Failure *) =>
                    (* Set the new receiver/sender list to include this process,
                       and suspend ourselves, releasing the lock. *)
                    (
                        senders := (blocker, myProcessData, v) :: !senders;
                        receivers := newlist;
                        (* Wait until we're woken up and release the lock.
                           This may result in an exception but if the exception is
                           raised the lock will be reacquired so we must unlock it in
                           the handler. *)
                        wait(blocker, chanLock)
                            handle exn => (unlock chanLock; raise exn);
                        (* We don't need the lock any longer. *)
                        unlock chanLock
                    )
            end
        )
    
    fun receive (CHAN {senders, receivers, chanLock, ...}): 'a =
        blockInterrupt(fn () =>
            let
                val () = lock chanLock;
                val myProcessData as PROC { blocker, ...} = get_process_data()
            in
                case synchronise(!senders, myProcessData)
                of  (newlist, FoundMatch (p,_,v)) (* Success *) =>
                    (
                        senders := newlist;
                        signal p; (* Wake up the sending thread. *)
                        unlock chanLock;
                        v (* This is our result *)
                    )
                |   (newlist, NoMatch) (* Failure *) =>
                    (* Set the new receiver/sender list to include this process,
                       and suspend ourselves, releasing the lock. *)
                    let
                        val basket = ref NONE; (* Create a basket to receive the result. *)
                    in
                        receivers := (blocker, myProcessData, basket) :: !receivers;
                        senders := newlist;
                        (* Wait until we're woken up and release the lock.
                           This may result in an exception but if the exception is
                           raised the lock will be reacquired so we must unlock it in
                           the handler. *)
                        wait(blocker, chanLock)
                            handle exn => (unlock chanLock; raise exn);
                        (* We don't need the lock any longer. *)
                        unlock chanLock;
                        valOf(!basket) (* This should have been set to SOME v by the sender. *)
                    end
            end
        )


    fun new_process f synch attrs =
    (* Make a new process. *)
    let
        val pnum = (identifiers := !identifiers+1; !identifiers);
        val data =
            PROC { synchro = ref synch,  processNo = pnum, blocker = conditionVar() }
        fun fun_to_fork () =
            (
                setLocal(procTag, data);
                (f () handle _ => ())
            )
        val newproc = fork(fun_to_fork, attrs)
    in
        if !debug then (PolyML.print("new_process:", data); ()) else ();
        newproc
    end
    
    fun fork f =
    let
        (* Get the parent's synchroniser and remove any redundant entries. *)
        val synch = getActiveSynchroniser(get_process_data(), true)
        val _ =
            new_process f synch (* Share the parent's synchroniser. *)
                [EnableBroadcastInterrupt false] (* Does not accept broadcasts. *)
    in
        ()
    end
    
    and console f =
    let
        (* Get the parent's synchroniser and remove any redundant entries. *)
        val synch = getActiveSynchroniser(get_process_data(), true)
        val threadId =
            new_process f synch (* Share the parent's synchroniser. *)
                [EnableBroadcastInterrupt true] (* Accepts broadcasts. *)
    in
        (* Return a function that will interrupt the process. *)
        fn () => interrupt threadId
    end

    and choice (f, g) =(* Fork a pair of "choice" processes. *)
    let
        (* Get the parent's synchroniser and remove any redundant entries. *)
        val synch = getActiveSynchroniser(get_process_data(), true)
        (* If the parent is already a Choice (whether Taken or not), we
           run the new processes in Parallel with it. The reason for this
           is that if we have choice( (choice(a,b); c), d)  we allow both
           a and c (say) to communicate (N.B.  "choice" creates two new
           processes and returns immediately so c runs in parallel with
           a and b).  It is actually equivalent to a.c + b.c + d .  *)
        val newSynch = SYNCH{state = ref ChoiceUntaken, synchLock = mutex()}
    in
        new_process g (synch @ [(newSynch, DirLeft)])
            [EnableBroadcastInterrupt false];
        new_process f (synch @ [(newSynch, DirRight)])
            [EnableBroadcastInterrupt false];
        ()
    end

    val interruptConsoleProcesses = broadcastInterrupt
end;