1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
|
#lang zuo/base
(provide call-in-main-thread
(rename-out
[make-thread thread]
[make-channel channel])
thread?
channel?
channel-put
channel-get
channel-try-get
thread-process-wait)
(struct thread (id))
(struct channel (id))
(struct ch (hd tl w-hd w-tl)) ; channel state: queues of values and waiters
;; a `state` represents the state of the thread scheduler, such as
;; enqeued threads and channel contentl from a client perspective,
;; channels offer a form of state among communicating threadsl this
;; state is implemented through delimited continuations (i.e., in the
;; style of effect handlers)
(struct state (hd ; list of thunks
tl ; list of thunks
channels ; channel-id -> channel
processes)) ; list of (cons (list handle ...) k)
;; state requests from threads, distinct from anything else a thread
;; might return:
(struct state-get-msg (k))
(struct state-put-msg (state k))
(define thread-tag (string->uninterned-symbol "thread"))
(define (check-in-thread who)
(unless (continuation-prompt-available? thread-tag)
(error (~a who ": not in a thread context"))))
;; a request is issued by discarding the current continuation
(define empty-k (call/prompt (lambda () (call/cc (lambda (k) k))) thread-tag))
;; runs thunk as a thread in a new scheduler; any created threads or
;; channels are specific to the scheduler; returns the result of the
;; main thread when no threads can run; waits on processes only if
;; there's nothing else to do
(define (call-in-main-thread thunk)
(raw-make-channel ; make a channel to hold the main thread's result
(state '() '() (hash) '())
(lambda (chl st)
;; for the scheduler loop, it's convenient to break out the head
;; thunk of the thread queue, which starts out as being the main
;; thread's thunk
(let loop ([st st] [hd-thunk (lambda () (channel-put chl (thunk)))])
(cond
[hd-thunk
(let ([v (call/prompt hd-thunk thread-tag)])
;; the thread either made a state request or has terminated
(cond
[(state-get-msg? v)
(loop st (lambda () ((state-get-msg-k v) st)))]
[(state-put-msg? v)
(loop (state-put-msg-state v) (lambda () ((state-put-msg-k v) (void))))]
[else (loop st #f)]))]
[(pair? (state-hd st))
(loop (state-set-hd st (cdr (state-hd st))) (car (state-hd st)))]
[(pair? (state-tl st))
(loop (state-set-hd (state-set-tl st '()) (reverse (state-tl st))) #f)]
[(pair? (state-processes st))
(let* ([ps+ks (state-processes st)]
[p (apply process-wait (apply append (map car ps+ks)))]
[ps+k (ormap (lambda (ps+k) (and (member p (car ps+k)) ps+k))
ps+ks)]
[st (state-set-processes st (filter (lambda (e) (not (eq? e ps+k)))
ps+ks))])
(loop st (lambda () ((cdr ps+k) p))))]
[else
(raw-channel-get st chl
(lambda (v st) v)
(lambda (st)
(error "call-in-thread: main thread is stuck")))])))))
(define (enqueue-thread st thunk)
(state-set-tl st (cons thunk (state-tl st))))
(define (raw-make-channel st k)
(let ([id (string->uninterned-symbol "ch")])
(k (channel id)
(state-set-channels st (hash-set (state-channels st)
id
(ch '() '() '() '()))))))
;; gets or sets the state
(define (current-state . st)
(cond
[(null? st)
(call/cc
(lambda (k)
(empty-k (state-get-msg k))))]
[(null? (cdr st))
(call/cc
(lambda (k)
(empty-k (state-put-msg (car st) k))))]
[else (arity-error 'current-state st)]))
;; suspends the current thread; it must have been enqueued with
;; a process or channel if it's going to be resumed
(define (yield)
(empty-k (void)))
(define make-thread
(let ([thread
(lambda (thunk)
(unless (procedure? thunk)
(arg-error 'thread "procedure" thunk))
(check-in-thread 'thread)
(define th (thread (string->uninterned-symbol "thread")))
(let ([st (current-state)])
(current-state (enqueue-thread st (lambda () (thunk)))))
th)])
thread))
(define make-channel
(let ([channel
(lambda ()
(check-in-thread 'channel)
(raw-make-channel
(current-state)
(lambda (ch st)
(current-state st)
ch)))])
channel))
(define (channel-put chl v)
(unless (channel? chl) (arg-error 'channel-put "channel" chl))
(check-in-thread 'channel-put)
(let* ([st (current-state)]
[chs (state-channels (current-state))]
[ch (hash-ref chs (channel-id chl) #f)])
(unless ch (error "channel-put: does not belong to the running thread group" ch))
(define (update-ch st ch) (state-set-channels st (hash-set chs (channel-id chl) ch)))
(let loop ([ch ch])
(let ([w-hd (ch-w-hd ch)])
(cond
[(pair? w-hd)
(let ([waiter (car (ch-w-hd ch))]
[ch (ch-set-w-hd ch (cdr (ch-w-hd ch)))])
(let* ([st (update-ch st ch)])
(current-state (enqueue-thread st (lambda () (waiter v))))
(void)))]
[else
(let* ([w-tl (ch-w-tl ch)])
(cond
[(null? w-tl)
(current-state (update-ch st (ch-set-tl ch (cons v (ch-tl ch)))))
(void)]
[else
(loop (ch-set-w-tl (ch-set-w-hd ch (reverse w-tl)) '()))]))])))))
(define (raw-channel-get st chl k yield-k [just-try? #f])
(let* ([chs (state-channels st)]
[ch (hash-ref chs (channel-id chl) #f)])
(unless ch (error "channel-get: does not belong to the running thread group" ch))
(define (update-ch st ch) (state-set-channels st (hash-set chs (channel-id chl) ch)))
(let loop ([ch ch])
(let ([hd (ch-hd ch)])
(cond
[(pair? hd)
(k (car hd)
(update-ch st (ch-set-hd ch (cdr hd))))]
[else
(let* ([tl (ch-tl ch)])
(cond
[(null? tl)
(cond
[just-try?
(yield-k st)]
[else
(call/cc
(lambda (k)
(yield-k (update-ch st (ch-set-w-tl ch (cons k (ch-w-tl ch)))))))])]
[else
(loop (ch-set-tl (ch-set-hd ch (reverse tl)) '()))]))])))))
(define (do-channel-get who chl unavailable just-try?)
(unless (channel? chl) (arg-error 'channel-get "channel" chl))
(check-in-thread 'channel-get)
(raw-channel-get (current-state) chl
(lambda (v st)
(current-state st)
v)
(lambda (st)
(current-state st)
(unavailable))
just-try?))
(define (channel-get chl)
(do-channel-get 'channel-get chl yield #f))
(define (channel-try-get chl)
(do-channel-get 'channel-try-get chl (lambda () #f) #t))
(define (thread-process-wait p . ps)
(for-each (lambda (p)
(unless (handle? p) (arg-error 'thread-process-wait "handle" p)))
(cons p ps))
(check-in-thread 'thread-process-wait)
(call/cc
(lambda (k)
(let ([st (current-state)])
(current-state (state-set-processes st (cons (cons (cons p ps) k) (state-processes st))))
(yield)))))
|