File: processes.ML

package info (click to toggle)
polyml 5.2.1-1.1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd, wheezy
  • size: 19,692 kB
  • ctags: 17,567
  • sloc: cpp: 37,221; sh: 9,591; asm: 4,120; ansic: 428; makefile: 203; ml: 191; awk: 91; sed: 10
file content (392 lines) | stat: -rw-r--r-- 14,081 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
(*
    Title:      Process package for ML.
    Author:     David C. J. Matthews
	Copyright (c) 2007

	This library is free software; you can redistribute it and/or
	modify it under the terms of the GNU Lesser General Public
	License as published by the Free Software Foundation; either
	version 2.1 of the License, or (at your option) any later version.
	
	This library is distributed in the hope that it will be useful,
	but WITHOUT ANY WARRANTY; without even the implied warranty of
	MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
	Lesser General Public License for more details.
	
	You should have received a copy of the GNU Lesser General Public
	License along with this library; if not, write to the Free Software
	Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
*)

(* This is provided for backwards compatibility.  New programs should use
   the Thread structure directly. *)

structure Process:
sig
   type 'a channel 
   val channel: unit -> '_a channel
   val send:    'a channel * 'a -> unit
   val receive: 'a channel -> 'a
   val fork:    (unit->unit)->unit
   val console: (unit->unit)->(unit->unit)
   val choice:  (unit->unit)*(unit->unit)->unit
   val interruptConsoleProcesses: unit->unit
end =
struct
	open Thread.Thread Thread.Mutex Thread.ConditionVar
	
	val debug = ref false and identifiers = ref 0 and ids = ref 0;

	(* Each process created by fork, console or choice has this information
	   as thread-local data.  *)
	datatype processData = PROC of {
		synchro: (synchroniser * direction) list ref,	(* The synchroniser chain. *)
		blocker: conditionVar, (* Condition var to block this process. *)
		processNo: int			(* An identifier for debugging. *)
	}
	
	and synchroniser =
		SYNCH of {
		state: choicestate ref, (* The state of this choice. *)
		synchLock: mutex      (* A mutex to protect the state variable. *)
	}
	
	and choicestate = ChoiceUntaken | ChoiceTaken of direction
	
	and direction = DirLeft | DirRight;
	
	val procTag = Universal.tag(): processData Universal.tag

	(* Get the process data for this thread.  If it was created by a Thread call
	   directly it may not yet have any process data so we need to make it now.  *)
	fun get_process_data(): processData =
		case getLocal procTag of
			SOME p => p
		|	NONE =>
			let
				val pnum = (identifiers := !identifiers+1; !identifiers);
				val pData = PROC { synchro = ref [], blocker = conditionVar(), processNo = pnum }
			in
				setLocal(procTag, pData);
				pData
			end

	datatype 'a channel =
		CHAN of {senders: 'a procVal list ref,
				 receivers: 'a option ref procVal list ref,
				 chanLock: mutex,
		         Id: int}

	(* This represents a suspended process.  The 'a is either a value to be
	   sent or a "basket", a reference to hold the result. *)
	withtype 'a procVal = conditionVar * processData * 'a

	fun channel () : 'a channel =
		CHAN {senders = ref [], receivers = ref [],
		      chanLock = mutex(), Id = (ids := !ids+1; !ids) }

	datatype 'a synchResult = NoMatch | FoundMatch of 'a procVal

	(* Prunes the synchroniser list to remove committed choices.
	   Returns the first non-committed synchroniser or the first committed
	   synchroniser with a choice that is taken in the "wrong" direction
	   (i.e. which ndicates that this process must not be allowed to communicate). *)
	fun getActiveSynchroniser(PROC{synchro, ...}, unlockAfter) =
	let
		fun getSynch [] = []
		|	getSynch(l as (SYNCH{state, synchLock}, dir) :: t) =
			(
				lock synchLock;
				case ! state of
					ChoiceUntaken => (* This is untaken.  Stop here. *)
					(
						if unlockAfter
						then unlock synchLock
						else (getSynch t; ()); (* We need to lock any others. *)
						l
					)
				|	ChoiceTaken d =>
						(* This is taken.  Stop here if it is taken in a different way. *)
					(
						unlock synchLock;
						if d = dir
						then getSynch t
						else l
					)
			)
		val newSynchList = getSynch(! synchro)
	in
		(* We can update the list for this process.  We don't need to lock the
		   synchro variable since it is only updated either by the process itself or
		   when this process is waiting on a channel, which is locked before access. *)
		synchro := newSynchList;
		newSynchList
	end

	(* Try to find a matching process.  toSearch is the list of corresponding
	   processes i.e. receivers if we are trying to send, senders if we are
	   trying to receive.  The result is a pair of the updated version of the
	   toSearch, with the matching process removed if a match has been found
	   and the matching process's data. *)
	fun synchronise (toSearch: 'a procVal list, thisProcess) :
			'a procVal list * 'a synchResult =
	let		
		(* Release the lock on the synchroniser for the process that is looking for
		   a partner.  This is only called if no matching process can be found.  *)
		fun releaseLock(PROC{synchro = ref synchro, ...}) =
			List.app
				(fn (SYNCH{synchLock, state=ref ChoiceUntaken}, _) => unlock synchLock
				  |  _ => ()) synchro

		(* Commit the choices and release the locks.  If some entries are shared
		   between the two processes then we may find some entries already set. *)
		fun commitChoices(PROC{synchro=synchro as ref synch, ...}) =
		(
			List.app
				(fn (SYNCH{synchLock, state=state as ref ChoiceUntaken}, dir) =>
					 (state := ChoiceTaken dir; unlock synchLock)
				  |  _ => ()) synch;
			synchro := [] (* Since all are taken we can set this to the empty list. *)
		)

		(* Get the first synchroniser and lock it unless it is already committed. *)
		val mySynch = getActiveSynchroniser(thisProcess, false (* Leave locked. *))

		(* Get the list of synchronisers for a potential matching process.  Generally
		   any process on the sender list will match a receiver and vice versa.  The
		   exception is if the two processes are alternative choices.  We have to be
		   careful with the synchroniser lists.  We've already locked the list for our
		   process so we mustn't lock any synchronisers that are shared.  *)
		datatype matchResult =
			MrTaken | MrAlternatives | MrOK of (synchroniser * direction) list

		fun getMatchingSynchs(PROC{synchro, ...}) =
		let
			fun getSynch([], _) = MrOK []
			|	getSynch(l as (SYNCH{state, ...}, dir) :: t,
						 myL as (SYNCH{state=s, ...}, myDir) :: myT) =
				if s <> state 
				then (* Different references - safe to lock.  *)
					lockSynch(l, myL)
				else (* Same reference - already locked. *)
					if dir <> myDir (* These are different choices. *)
				then MrAlternatives (* Not allowed to communicate. *)
				else (* OK, same choice: test the rest*)
					getSynch(t, myT)
			|	getSynch(l, []) =
					(* The list of synchronisers for the original process is empty or
					   has run out before this.  *)
					lockSynch(l, [])
	
			and lockSynch(l as (SYNCH{state, synchLock}, dir) :: t, myL) =
				(
					lock synchLock;
					case ! state of
						ChoiceUntaken => (* This is untaken.  Stop here. *)
						(
							getSynch(t, myL); (* We need to lock any others. *)
							MrOK l
						)
					|	ChoiceTaken d =>
							(* This is taken.  Stop here if it is taken in a different way. *)
						(
							unlock synchLock;
							if d = dir
							then getSynch(t, myL)
							else MrTaken
						)
				)
			|   lockSynch _ = raise Match (* Suppress warning *)
		
		in
			getSynch(! synchro, mySynch) 
		end

		fun findAProcess [] = 
	       	(* Find a process that matches and return the new list of partners
			  and the new list of runnable processes. *)
	       (* No match *) ([], NoMatch)
		|	findAProcess((entry as (p,d,v)) :: t) =
			case getMatchingSynchs d of
				MrTaken =>
					(* This process is a committed choice in a different direction.  Drop
			    	   it from the list since it can never communicate.  *)
					findAProcess t
			|	MrAlternatives =>
					(* This process is an alternative choice with our process.  It can
					   still communicate, just not with us.  Skip this and try the next. *)
				let
					val (clist, result) = findAProcess t
				in
					(entry :: clist, result)
				end
			|	MrOK synchs =>
					(t, FoundMatch entry) (* Return the new list. *)
		
	in
		case mySynch of
			(SYNCH{state = ref (ChoiceTaken _), ...}, _) :: _ =>
	       (* This choice is already taken - kill this process.
			  Actually all we do at this stage is pretend that the process
			  cannot communicate, and suspend it.  Later it may be removed
			  from the channel. *)
	        (toSearch, NoMatch)
		|	_ => (* No synch or uncommitted choice. *)
			case findAProcess toSearch of
				t as (_, NoMatch) => (releaseLock thisProcess; t)
			|	t as (_, FoundMatch(_,p,_)) =>
					(commitChoices thisProcess; commitChoices p; t)
	end

	(* We need to ensure that interrupts are delivered synchronously when
	   synchronising rather than risk receiving an interrupt while holding a lock. *)
	fun blockInterrupt (f: unit->'a) =
	let
		open Thread
		val oldState = getAttributes()
	in
		case List.find (fn InterruptState _ => true | _ => false) oldState of
			SOME(InterruptState InterruptDefer) => f() (* Continue to defer. *)
		|	SOME(InterruptState InterruptSynch) => f() (* No need to change. *)
		|	_ => (* Unset(?) or asynchronous.  Have to make synchronous. *)
			let
				val () = setAttributes[InterruptState InterruptSynch]
				(* Call the function.  It may raise an Interrupt exception if it has to
				   wait.  In that case we still need to restore the old state. *)
				val result = 
					f() handle exn => (setAttributes oldState; raise exn)
				val () = setAttributes oldState;
			in
				result
			end
	end

	fun send (ch: 'a channel as CHAN {senders, receivers, chanLock, ...}, v:'a) =
		blockInterrupt(fn () =>
			let
				val () = lock chanLock;
				val myProcessData as PROC { blocker, ...} = get_process_data()
			in
				case synchronise(!receivers, myProcessData)
				of	(newlist, FoundMatch (p,_,basket)) (* Success *) =>
					(
						basket := SOME v; (* Put the sent value into the receiver's basket. *)
						receivers := newlist;
						signal p; (* Wake up the new thread. *)
						unlock chanLock
					)
				|	(newlist, NoMatch) (* Failure *) =>
		   			(* Set the new receiver/sender list to include this process,
					   and suspend ourselves, releasing the lock. *)
					(
						senders := (blocker, myProcessData, v) :: !senders;
			          	receivers := newlist;
						(* Wait until we're woken up and release the lock.
						   This may result in an exception but if the exception is
						   raised the lock will be reacquired so we must unlock it in
						   the handler. *)
						wait(blocker, chanLock)
							handle exn => (unlock chanLock; raise exn);
						(* We don't need the lock any longer. *)
						unlock chanLock
					)
			end
		)
	
	fun receive (ch: 'a channel as CHAN {senders, receivers, chanLock, ...}): 'a =
		blockInterrupt(fn () =>
			let
				val () = lock chanLock;
				val myProcessData as PROC { blocker, ...} = get_process_data()
			in
				case synchronise(!senders, myProcessData)
				of	(newlist, FoundMatch (p,_,v)) (* Success *) =>
					(
						senders := newlist;
						signal p; (* Wake up the sending thread. *)
						unlock chanLock;
						v (* This is our result *)
					)
				|	(newlist, NoMatch) (* Failure *) =>
		   			(* Set the new receiver/sender list to include this process,
					   and suspend ourselves, releasing the lock. *)
					let
						val basket = ref NONE; (* Create a basket to receive the result. *)
					in
						receivers := (blocker, myProcessData, basket) :: !receivers;
			          	senders := newlist;
						(* Wait until we're woken up and release the lock.
						   This may result in an exception but if the exception is
						   raised the lock will be reacquired so we must unlock it in
						   the handler. *)
						wait(blocker, chanLock)
							handle exn => (unlock chanLock; raise exn);
						(* We don't need the lock any longer. *)
						unlock chanLock;
						valOf(!basket) (* This should have been set to SOME v by the sender. *)
					end
			end
		)


	fun new_process f synch attrs =
	(* Make a new process. *)
	let
		val pnum = (identifiers := !identifiers+1; !identifiers);
		val data =
			PROC { synchro = ref synch,  processNo = pnum, blocker = conditionVar() }
		fun fun_to_fork () =
			(
				setLocal(procTag, data);
	       		(f () handle _ => ())
			)
		val newproc = fork(fun_to_fork, attrs)
	in
		if !debug then (PolyML.print("new_process:", data); ()) else ();
		newproc
	end
	
	fun fork f =
	let
		(* Get the parent's synchroniser and remove any redundant entries. *)
		val synch = getActiveSynchroniser(get_process_data(), true)
		val threadId =
			new_process f synch (* Share the parent's synchroniser. *)
				[EnableBroadcastInterrupt false] (* Does not accept broadcasts. *)
	in
		()
	end
	
	and console f =
	let
		(* Get the parent's synchroniser and remove any redundant entries. *)
		val synch = getActiveSynchroniser(get_process_data(), true)
		val threadId =
			new_process f synch (* Share the parent's synchroniser. *)
				[EnableBroadcastInterrupt true] (* Accepts broadcasts. *)
	in
		(* Return a function that will interrupt the process. *)
		fn () => interrupt threadId
	end

	and choice (f, g) =(* Fork a pair of "choice" processes. *)
	let
		(* Get the parent's synchroniser and remove any redundant entries. *)
		val synch = getActiveSynchroniser(get_process_data(), true)
		(* If the parent is already a Choice (whether Taken or not), we
		   run the new processes in Parallel with it. The reason for this
		   is that if we have choice( (choice(a,b); c), d)  we allow both
		   a and c (say) to communicate (N.B.  "choice" creates two new
		   processes and returns immediately so c runs in parallel with
		   a and b).  It is actually equivalent to a.c + b.c + d .  *)
		val newSynch = SYNCH{state = ref ChoiceUntaken, synchLock = mutex()}
	in
		new_process g (synch @ [(newSynch, DirLeft)])
			[EnableBroadcastInterrupt false];
		new_process f (synch @ [(newSynch, DirRight)])
			[EnableBroadcastInterrupt false];
		()
	end

	val interruptConsoleProcesses = broadcastInterrupt
end;