1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
;;; Copyright (c) 2011-2012, James M. Lawrence. All rights reserved.
;;;
;;; Redistribution and use in source and binary forms, with or without
;;; modification, are permitted provided that the following conditions
;;; are met:
;;;
;;; * Redistributions of source code must retain the above copyright
;;; notice, this list of conditions and the following disclaimer.
;;;
;;; * Redistributions in binary form must reproduce the above
;;; copyright notice, this list of conditions and the following
;;; disclaimer in the documentation and/or other materials provided
;;; with the distribution.
;;;
;;; * Neither the name of the project nor the names of its
;;; contributors may be used to endorse or promote products derived
;;; from this software without specific prior written permission.
;;;
;;; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
;;; "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
;;; LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
;;; A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
;;; HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
;;; SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
;;; LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
;;; DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
;;; THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
;;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
;;; OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
(defpackage #:lparallel.kernel-util
(:documentation
"(semi-private) Abstracts some common patterns for submitting and
receiving tasks. This probably won't change, but no guarantees.")
(:use #:cl
#:lparallel.util
#:lparallel.kernel
#:lparallel.queue)
(:export #:with-submit-counted
#:submit-counted
#:receive-counted)
(:export #:with-submit-indexed
#:submit-indexed
#:receive-indexed)
(:export #:with-submit-cancelable
#:submit-cancelable
#:receive-cancelables)
(:import-from #:lparallel.kernel
#:*worker*
#:steal-work
#:channel-kernel))
(in-package #:lparallel.kernel-util)
(defun steal-until-receive-result (channel worker fn)
(declare #.*normal-optimize*)
(loop
(multiple-value-bind (result presentp) (try-receive-result channel)
(when presentp
(when fn
(locally (declare (type function fn))
(funcall fn result)))
(return)))
(steal-work (channel-kernel channel) worker)))
(defun receive-results (channel count fn)
(declare #.*normal-optimize*)
(let ((worker *worker*))
(if worker
(repeat count
(steal-until-receive-result channel worker fn))
(if fn
(do-fast-receives (result channel count)
(locally (declare (type function fn))
(funcall fn result)))
(do-fast-receives (result channel count)
(declare (ignore result)))))))
(defmacro with-submit-counted (&body body)
(with-gensyms (count channel)
`(let ((,count 0)
(,channel (make-channel)))
(declare (fixnum ,count))
(flet ((submit-counted (&rest args)
(declare (dynamic-extent args))
(apply #'submit-task ,channel args)
(incf ,count))
(receive-counted ()
(receive-results ,channel ,count nil)))
(declare (inline submit-counted receive-counted))
,@body))))
(defun indexing-wrapper (array index function args)
(setf (aref array index) (apply function args)))
(defmacro/once with-submit-indexed (&once count &once array &body body)
(with-gensyms (channel)
`(let ((,channel (make-channel)))
(flet ((submit-indexed (index function &rest args)
(submit-task
,channel #'indexing-wrapper ,array index function args))
(receive-indexed ()
(receive-results ,channel ,count nil)
,array))
(declare (inline submit-indexed receive-indexed))
,@body))))
(defmacro with-submit-cancelable (&body body)
(with-gensyms (canceledp channel count)
`(let ((,canceledp nil)
(,count 0)
(,channel (make-channel)))
(flet ((submit-cancelable (fn &rest args)
(submit-task ,channel
(lambda ()
(if ,canceledp
'task-canceled
(apply fn args))))
(incf ,count)))
(macrolet ((receive-cancelables (result &body body)
`(receive-results
,',channel ,',count (lambda (,result) ,@body))))
(unwind-protect (progn ,@body)
(setf ,canceledp t)))))))
|