File: message-port.jl

package info (click to toggle)
librep 0.17-13
  • links: PTS
  • area: main
  • in suites: etch, etch-m68k
  • size: 5,648 kB
  • ctags: 2,969
  • sloc: ansic: 32,770; lisp: 12,399; sh: 7,971; makefile: 515; sed: 93
file content (109 lines) | stat: -rw-r--r-- 3,358 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#| message-port.jl -- inter-thread communication channels

   $Id: message-port.jl,v 1.3 2001/08/14 02:35:19 jsh Exp $

   Copyright (C) 2001 John Harper <jsh@pixelslut.com>

   This file is part of librep.

   librep is free software; you can redistribute it and/or modify it
   under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2, or (at your option)
   any later version.

   librep is distributed in the hope that it will be useful, but
   WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with librep; see the file COPYING.  If not, write to
   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
|#

(define-structure rep.threads.message-port

    (export make-message-port
	    message-port-p
	    message-fetch
	    message-send
	    message-waiting-p)

    (open rep
	  rep.threads
	  rep.threads.mutex
	  rep.threads.condition-variable
	  rep.data.records
	  rep.data.queues)

  (define-record-type :message-port
    (create-port queue mutex condition)
    message-port-p
    (queue port-queue)
    (mutex port-mutex)
    (condition port-condition))

  (define (make-message-port)
    "Create and return a new message port."
    (create-port (make-queue) (make-mutex) (make-condition-variable)))

  (define (message-waiting-p port)
    "Return true if there are messages waiting on message port PORT."
    (obtain-mutex (port-mutex port))
    (unwind-protect
	(not (queue-empty-p (port-queue port)))
      (release-mutex (port-mutex port))))

  (define (message-fetch port #!optional timeout)
    "Fetch the earliest unread message sent to message port PORT. Blocks the
current thread for TIMEOUT milliseconds, or indefinitely if TIMEOUT isn't
defined. Returns the message, or false if no message could be read."
    (obtain-mutex (port-mutex port))
    (unwind-protect
	(let again ((can-wait t))
	  (if (queue-empty-p (port-queue port))
	      (if can-wait
		  (again (condition-variable-wait (port-condition port)
						  (port-mutex port) timeout))
		nil)
	    ;; we have a waiting message
	    (dequeue (port-queue port))))
      (release-mutex (port-mutex port))))

  (define (message-send port message)
    "Send the message MESSAGE (an arbitrary value) to message port PORT."
    (obtain-mutex (port-mutex port))
    (unwind-protect
	(progn
	  (enqueue (port-queue port) message)
	  (condition-variable-signal (port-condition port)))
      (release-mutex (port-mutex port)))))


#| Test function:

  (define (test)

    (let ((port (make-message-port)))

      (define (master)
	(do ((i 0 (1+ i)))
	    ((= i 10))
	  (thread-suspend (current-thread) (random 1000))
	  (let ((data (make-string i (+ (random 10) #\0))))
	    (message-send port data)
	    (format standard-output "master: sent %S\n" data))))

      (define (slave)
	(do ((i 0 (1+ i)))
	    ((= i 10))
	  (thread-suspend (current-thread) (random 1000))
	  (let ((data (message-fetch port)))
	    (format standard-output "slave: received %S\n" data))))

    (call-with-dynamic-root
     (lambda ()
       (random t)
       (make-thread slave "slave")
       (make-thread master "master")))))
|#