File: thread.zuo

package info (click to toggle)
zuo 1.12-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,352 kB
  • sloc: ansic: 6,374; makefile: 39
file content (211 lines) | stat: -rw-r--r-- 7,799 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#lang zuo/base

(provide call-in-main-thread
         (rename-out
          [make-thread thread]
          [make-channel channel])
         thread?
         channel?
         channel-put
         channel-get
         channel-try-get
         thread-process-wait)

(struct thread (id))

(struct channel (id))
(struct ch (hd tl w-hd w-tl)) ; channel state: queues of values and waiters

;; a `state` represents the state of the thread scheduler, such as
;; enqeued threads and channel contentl from a client perspective,
;; channels offer a form of state among communicating threadsl this
;; state is implemented through delimited continuations (i.e., in the
;; style of effect handlers)
(struct state (hd          ; list of thunks
               tl          ; list of thunks
               channels    ; channel-id -> channel
               processes)) ; list of (cons (list handle ...) k)

;; state requests from threads, distinct from anything else a thread
;; might return:
(struct state-get-msg (k))
(struct state-put-msg (state k))

(define thread-tag (string->uninterned-symbol "thread"))
(define (check-in-thread who)
  (unless (continuation-prompt-available? thread-tag)
    (error (~a who ": not in a thread context"))))

;; a request is issued by discarding the current continuation
(define empty-k (call/prompt (lambda () (call/cc (lambda (k) k))) thread-tag))

;; runs thunk as a thread in a new scheduler; any created threads or
;; channels are specific to the scheduler; returns the result of the
;; main thread when no threads can run; waits on processes only if
;; there's nothing else to do
(define (call-in-main-thread thunk)
  (raw-make-channel ; make a channel to hold the main thread's result
   (state '() '() (hash) '())
   (lambda (chl st)
     ;; for the scheduler loop, it's convenient to break out the head
     ;; thunk of the thread queue, which starts out as being the main
     ;; thread's thunk
     (let loop ([st st] [hd-thunk (lambda () (channel-put chl (thunk)))])
       (cond
         [hd-thunk
          (let ([v (call/prompt hd-thunk thread-tag)])
            ;; the thread either made a state request or has terminated
            (cond
              [(state-get-msg? v)
               (loop st (lambda () ((state-get-msg-k v) st)))]
              [(state-put-msg? v)
               (loop (state-put-msg-state v) (lambda () ((state-put-msg-k v) (void))))]
              [else (loop st #f)]))]
         [(pair? (state-hd st))
          (loop (state-set-hd st (cdr (state-hd st))) (car (state-hd st)))]
         [(pair? (state-tl st))
          (loop (state-set-hd (state-set-tl st '()) (reverse (state-tl st))) #f)]
         [(pair? (state-processes st))
          (let* ([ps+ks (state-processes st)]
                 [p (apply process-wait (apply append (map car ps+ks)))]
                 [ps+k (ormap (lambda (ps+k) (and (member p (car ps+k)) ps+k))
                              ps+ks)]
                 [st (state-set-processes st (filter (lambda (e) (not (eq? e ps+k)))
                                                     ps+ks))])
            (loop st (lambda () ((cdr ps+k) p))))]
         [else
          (raw-channel-get st chl
                           (lambda (v st) v)
                           (lambda (st)
                             (error "call-in-thread: main thread is stuck")))])))))

(define (enqueue-thread st thunk)
  (state-set-tl st (cons thunk (state-tl st))))

(define (raw-make-channel st k)
  (let ([id (string->uninterned-symbol "ch")])
    (k (channel id)
       (state-set-channels st (hash-set (state-channels st)
                                        id
                                        (ch '() '() '() '()))))))

;; gets or sets the state
(define (current-state . st)
  (cond
    [(null? st)
     (call/cc
      (lambda (k)
        (empty-k (state-get-msg k))))]
    [(null? (cdr st))
     (call/cc
      (lambda (k)
        (empty-k (state-put-msg (car st) k))))]
    [else (arity-error 'current-state st)]))

;; suspends the current thread; it must have been enqueued with
;; a process or channel if it's going to be resumed
(define (yield)
  (empty-k (void)))

(define make-thread
  (let ([thread
         (lambda (thunk)
           (unless (procedure? thunk)
             (arg-error 'thread "procedure" thunk))
           (check-in-thread 'thread)
           (define th (thread (string->uninterned-symbol "thread")))
           (let ([st (current-state)])
             (current-state (enqueue-thread st (lambda () (thunk)))))
           th)])
    thread))

(define make-channel
  (let ([channel
         (lambda ()
           (check-in-thread 'channel)
           (raw-make-channel
            (current-state)
            (lambda (ch st)
              (current-state st)
              ch)))])
    channel))

(define (channel-put chl v)
  (unless (channel? chl) (arg-error 'channel-put "channel" chl))
  (check-in-thread 'channel-put)
  (let* ([st (current-state)]
         [chs (state-channels (current-state))]
         [ch (hash-ref chs (channel-id chl) #f)])
    (unless ch (error "channel-put: does not belong to the running thread group" ch))
    (define (update-ch st ch) (state-set-channels st (hash-set chs (channel-id chl) ch)))
    (let loop ([ch ch])
      (let ([w-hd (ch-w-hd ch)])
        (cond
          [(pair? w-hd)
           (let ([waiter (car (ch-w-hd ch))]
                 [ch (ch-set-w-hd ch (cdr (ch-w-hd ch)))])
             (let* ([st (update-ch st ch)])
               (current-state (enqueue-thread st (lambda () (waiter v))))
               (void)))]
          [else
           (let* ([w-tl (ch-w-tl ch)])
             (cond
               [(null? w-tl)
                (current-state (update-ch st (ch-set-tl ch (cons v (ch-tl ch)))))
                (void)]
               [else
                (loop (ch-set-w-tl (ch-set-w-hd ch (reverse w-tl)) '()))]))])))))

(define (raw-channel-get st chl k yield-k [just-try? #f])
  (let* ([chs (state-channels st)]
         [ch (hash-ref chs (channel-id chl) #f)])
    (unless ch (error "channel-get: does not belong to the running thread group" ch))
    (define (update-ch st ch) (state-set-channels st (hash-set chs (channel-id chl) ch)))
    (let loop ([ch ch])
      (let ([hd (ch-hd ch)])
        (cond
          [(pair? hd)
           (k (car hd)
              (update-ch st (ch-set-hd ch (cdr hd))))]
          [else
           (let* ([tl (ch-tl ch)])
             (cond
               [(null? tl)
                (cond
                  [just-try?
                   (yield-k st)]
                  [else
                   (call/cc
                    (lambda (k)
                      (yield-k (update-ch st (ch-set-w-tl ch (cons k (ch-w-tl ch)))))))])]
               [else
                (loop (ch-set-tl (ch-set-hd ch (reverse tl)) '()))]))])))))

(define (do-channel-get who chl unavailable just-try?)
  (unless (channel? chl) (arg-error 'channel-get "channel" chl))
  (check-in-thread 'channel-get)
  (raw-channel-get (current-state) chl
                   (lambda (v st)
                     (current-state st)
                     v)
                   (lambda (st)
                     (current-state st)
                     (unavailable))
                   just-try?))

(define (channel-get chl)
  (do-channel-get 'channel-get chl yield #f))

(define (channel-try-get chl)
  (do-channel-get 'channel-try-get chl (lambda () #f) #t))

(define (thread-process-wait p . ps)
  (for-each (lambda (p)
              (unless (handle? p) (arg-error 'thread-process-wait "handle" p)))
            (cons p ps))
  (check-in-thread 'thread-process-wait)
  (call/cc
   (lambda (k)
     (let ([st (current-state)])
       (current-state (state-set-processes st (cons (cons (cons p ps) k) (state-processes st))))
       (yield)))))