1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
|
This is an initial port of CML from SML/NJ to MLton.
All of the core CML functionality is present:
signature CML; structure CML : CML
signature SYNC_VAR; structure SyncVar : SYNC_VAR
signature MAILBOX; structure Mailbox : MAILBOX
signature MULTICAST; structure Multicast : MULTICAST
signature SIMPLE_RPC; structure SimpleRPC : SIMPLE_RPC
signature RUN_CML; structure RunCML : RUN_CML
The RUN_CML signature is minimal:
signature RUN_CML =
sig
val isRunning : unit -> bool
val doit : (unit -> unit) * Time.time option -> OS.Process.status
val shutdown : OS.Process.status -> 'a
end
This does not include all of the cleanup and logging operations of the
SML/NJ RunCML structure. However, the implementation does include the
CML.timeOutEvt and CML.atTimeEvt functions, and a preemptive scheduler
that knows to sleep when there are no ready threads and some threads
blocked on time events.
NOTE: None of the Standard ML Basis Library has been made either
MLton.Thread or CML safe. Much of the IO and OS structures have event
based equivalents, which should be implemented. The "right" way to
call a Basis Library fuction that is stateful is to wrap the call with
MLton.Thread.atomically.
Implementation notes:
The implementation of CML in SML/NJ is built upon the first-class
continuations of the SMLofNJ.Cont module:
type 'a cont
val callcc : ('a cont -> 'a) -> 'a
val throw : 'a cont -> 'a -> 'b
val isolate : ('a -> unit) -> 'a cont
The implementation of CML in MLton is built upon the first-class
threads of the MLton.Thread module:
type 'a t
type ready_t
val new : ('a -> unit) -> 'a t
val prepVal : 'a t * 'a -> ready_t
val switch : ('a t -> ready_t) -> 'a
The port is relatively straightforward, because CML throws to a
continuation at most once. Hence, an "abstract" implementation of
CML could be built upon first-class one-shot continuations, which map
equally well to SML/NJ's continuations and MLton's threads.
The "essence" of the port is to transform:
... callcc (fn k => ... throw k' v') ...
to
... switch (fn t => ... prepVal (t', v')) ...
which suffices for the vast majority of the CML implementation.
There was only one complicated transformation: blocking multiple base
events. In SML/NJ CML, the representation of base events is given by:
datatype 'a event_status
= ENABLED of {prio : int, doFn : unit -> 'a}
| BLOCKED of {
transId : trans_id ref,
cleanUp : unit -> unit,
next : unit -> unit
} -> 'a
type 'a base_evt = unit -> 'a event_status
When synchronizing on a set of base events, which are all blocked, we
must invoke each BLOCKED function with the same transId and cleanUp
(the transId is (checked and) set to CANCEL by the cleanUp function,
which is invoked by the first enabled event; this "fizzles" every
other event in the synchronization group that later becomes enabled).
However, each BLOCKED function is implemented by a callcc, so that
when the event is enabled, it throws back to the point of
synchronization. Hence, the next function (which doesn't return) is
invoked by the BLOCKED function to escape the callcc and continue in
the thread performing the synchronization. In SML/NJ this is
implemented as follows:
fun ext ([], blockFns) = callcc (fn k => let
val throw = throw k
val (transId, setFlg) = mkFlg()
fun log [] = S.atomicDispatch ()
| log (blockFn :: r) =
throw (blockFn {
transId = transId,
cleanUp = setFlg,
next = fn () => log r
})
in
log blockFns; error "[log]"
end)
(Note that S.atomicDispatch invokes the continuation of the next
continuation on the ready queue.) This doesn't map well to the MLton
thread model. Although it follows the
... callcc (fn k => ... throw k v) ...
model, the fact that blockFn will also attempt to do
... callcc (fn k' => ... next ()) ...
means that the naive transformation will result in nested switch-es.
We need to think a little more about what this code is trying to do.
Essentially, each blockFn wants to capture this continuation, hold on
to it until the event is enabled, and continue with next; when the
event is enabled, before invoking the continuation and returning to
the synchronization point, the cleanUp and other event specific
operations are performed.
To accomplish the same effect in the MLton thread implemenation, we
have the following:
datatype 'a status =
ENABLED of {prio : int, doitFn : unit -> 'a}
| BLOCKED of {transId : trans_id,
cleanUp : unit -> unit,
next : unit -> rdy_thread} -> 'a
type 'a base = unit -> 'a status
and
fun ext ([], blockFns) : 'a =
S.atomicSwitch
(fn (t : 'a S.thread) =>
let
val (transId, cleanUp) = TransID.mkFlg ()
fun log blockFns : S.rdy_thread =
case blockFns of
[] => S.next ()
| blockFn::blockFns =>
(S.prep o S.new)
(fn _ => fn () =>
let
val () = S.atomicBegin ()
val x = blockFn {transId = transId,
cleanUp = cleanUp,
next = fn () => log blockFns}
in S.switch(fn _ => S.prepVal (t, x))
end)
in
log blockFns
end)
To avoid the nested switch-es, I run the blockFn in it's own thread,
whose only purpose is to return to the synchronization point. This
corresponds to the throw (blockFn {...}) in the SML/NJ
implementation. I'm worried that this implementation might be a
little expensive, starting a new thread for each blocked event (when
there are only multiple blocked events in a synchronization group).
But, I don't see another way of implementing this behavior in the
MLton thread model.
Note that another way of thinking about what is going on is to
consider each blockFn as prepending a different set of actions to the
thread t. It might be possible to give a MLton.Thread.unsafePrepend:
fun unsafePrepend (T r: 'a t, f: 'b -> 'a): 'b t =
let
val t =
case !r of
Dead => raise Fail "prepend to a Dead thread"
| New g => New (g o f)
| Paused (g, t) => Paused (fn h => g (f o h), t)
in (* r := Dead; *)
T (ref t)
end
I have commented out the r := Dead, which would allow multiple
prepends to the same thread (i.e., not destroying the original thread
in the process). Of course, only one of the threads could be run: if
the original thread were in the Paused state, then multiple threads
would share the underlying runtime/primitive thread. Now, this
matches the "one-shot" nature of CML continuations/threads, but I'm
not comfortable with extending the MLton.Thread module with such an
unsafe operation.
Other than this complication with blocking multiple base events, the
port was quite routine. (As a very pleasant surprise, the CML
implementation in SML/NJ doesn't use any SML/NJ-isms.) There is a
slight difference in the way in which critical sections are handled in
SML/NJ and MLton; since MLton.Thread.switch _always_ leaves a critical
section, it is sometimes necessary to add additional atomicBegin/End-s
to ensure that we remain in a critical section after a thread switch.
While looking at virtually every file in the core-CML implementation,
I took the liberty of simplifying things where it seemed possible; in
terms of style, the implementation is about half-way between Reppy's
original and MLton's.
Some changes of note:
* util/ contains all pertinant data-structures: (functional and
imperative) queues, (functional) priority queues. Hence, it
should be easier to switch in more efficient or real-time
implementations.
* core-cml/scheduler.sml: in both implementations, this is where most
of the interesting action takes place. I've made the connection
between MLton.Thread.t-s and ThreadId.thread_id-s more abstract
than it is in the SML/NJ implemenation, and encapsulated all of
the MLton.Thread operations in this module.
* eliminated all of the "by hand" inlining
Some thoughts on future extensions. The CML documentation says the
following:
CML.joinEvt: thread_id -> unit event
joinEvt tid
creates an event value for synchronizing on the termination of
the thread with the ID tid.
There are three ways that a thread may terminate: the
function that was passed to spawn (or spawnc) may return; it
may call the exit function, or it may have an uncaught
exception.
Note that joinEvt does not distinguish between these cases;
it also does not become enabled if the named thread
deadlocks (even if it is garbage collected).
I believe that the MLton.Finalizable might be able to relax that last
restriction. Upon the creation of a 'a Scheduler.thread, we could
attatch a finalizer to the underlying 'a MLton.Thread.t that enables
the joinEvt (in the associated ThreadID.thread_id) when the 'a
MLton.Thread.t becomes unreachable.
I don't know why CML doesn't have
CML.kill: thread_id -> unit
which has a fairly simple implementation -- setting a kill flag in the
thread_id and adjusting the scheduler to discard any killed threads
that it takes off the ready queue. The fairness of the scheduler
ensures that a killed thread will eventually be discarded. The
semantics are little murky for blocked threads that are killed,
though. For example, consider a thread blocked on SyncVar.mTake mv
and a thread blocked on SyncVar.mGet mv. If the first thread is
killed while blocked, and a third thread does SyncVar.mPut (mv, x),
then we might expect that we'll enable the second thread, and never
the first. But, when only the ready queue is able to discard killed
threads, then the SyncVar.mPut could enable the first thread (putting
it on the ready queue, from which it will be discarded) and leave the
second thread blocked. We could solve this by adjusting the
TransID.trans_id types and the "cleaner" functions to look for both
cancelled transactions and transactions on killed threads.
Between CML.timeOutEvt and CML.kill, one could give an efficient
solution to the recent comp.lang.ml post about terminating a function
that doesn't complete in a given time:
fun timeOut (f : unit -> 'a, t : Time.time) : 'a option =
let
val iv = SyncVar.iVar ()
val tid = CML.spawn (fn () => SyncVar.iPut (iv, f ()))
in
CML.select
[CML.wrap (CML.timeOutEvt t, fn () => (CML.kill tid; NONE)),
CML.wrap (SyncVar.iGetEvt iv, fn x => SOME x)]
end
|