1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
|
require 'forwardable'
require 'concurrent/channel/buffer'
require 'concurrent/channel/selector'
require 'concurrent/maybe'
require 'concurrent/executor/cached_thread_pool'
module Concurrent
# {include:file:docs-source/channel.md}
# @!macro warn.edge
class Channel
extend Forwardable
include Enumerable
# NOTE: Move to global IO pool once stable
GOROUTINES = Concurrent::CachedThreadPool.new
private_constant :GOROUTINES
BUFFER_TYPES = {
unbuffered: Buffer::Unbuffered,
buffered: Buffer::Buffered,
dropping: Buffer::Dropping,
sliding: Buffer::Sliding
}.freeze
private_constant :BUFFER_TYPES
DEFAULT_VALIDATOR = ->(value){ true }
private_constant :DEFAULT_VALIDATOR
Error = Class.new(StandardError)
class ValidationError < Error
def initialize(message = nil)
message ||= 'invalid value'
end
end
def_delegators :buffer,
:size, :capacity, :close, :closed?,
:blocking?, :empty?, :full?
alias_method :length, :size
alias_method :stop, :close
def initialize(opts = {})
# undocumented -- for internal use only
if opts.is_a? Buffer::Base
self.buffer = opts
return
end
capacity = opts[:capacity] || opts[:size]
buffer = opts[:buffer]
if capacity && buffer == :unbuffered
raise ArgumentError.new('unbuffered channels cannot have a capacity')
elsif capacity.nil? && buffer.nil?
self.buffer = BUFFER_TYPES[:unbuffered].new
elsif capacity == 0 && buffer == :buffered
self.buffer = BUFFER_TYPES[:unbuffered].new
elsif buffer == :unbuffered
self.buffer = BUFFER_TYPES[:unbuffered].new
elsif capacity.nil? || capacity < 1
raise ArgumentError.new('capacity must be at least 1 for this buffer type')
else
buffer ||= :buffered
self.buffer = BUFFER_TYPES[buffer].new(capacity)
end
self.validator = opts.fetch(:validator, DEFAULT_VALIDATOR)
end
def put(item)
return false unless validate(item, false, false)
do_put(item)
end
alias_method :send, :put
alias_method :<<, :put
def put!(item)
validate(item, false, true)
ok = do_put(item)
raise Error if !ok
ok
end
def put?(item)
if !validate(item, true, false)
Concurrent::Maybe.nothing('invalid value')
elsif do_put(item)
Concurrent::Maybe.just(true)
else
Concurrent::Maybe.nothing
end
end
def offer(item)
return false unless validate(item, false, false)
do_offer(item)
end
def offer!(item)
validate(item, false, true)
ok = do_offer(item)
raise Error if !ok
ok
end
def offer?(item)
if !validate(item, true, false)
Concurrent::Maybe.nothing('invalid value')
elsif do_offer(item)
Concurrent::Maybe.just(true)
else
Concurrent::Maybe.nothing
end
end
def take
item = do_take
item == Concurrent::NULL ? nil : item
end
alias_method :receive, :take
alias_method :~, :take
def take!
item = do_take
raise Error if item == Concurrent::NULL
item
end
def take?
item = do_take
item = if item == Concurrent::NULL
Concurrent::Maybe.nothing
else
Concurrent::Maybe.just(item)
end
item
end
# @example
#
# jobs = Channel.new
#
# Channel.go do
# loop do
# j, more = jobs.next
# if more
# print "received job #{j}\n"
# else
# print "received all jobs\n"
# break
# end
# end
# end
def next
item, more = do_next
item = nil if item == Concurrent::NULL
return item, more
end
def next?
item, more = do_next
item = if item == Concurrent::NULL
Concurrent::Maybe.nothing
else
Concurrent::Maybe.just(item)
end
return item, more
end
def poll
(item = do_poll) == Concurrent::NULL ? nil : item
end
def poll!
item = do_poll
raise Error if item == Concurrent::NULL
item
end
def poll?
if (item = do_poll) == Concurrent::NULL
Concurrent::Maybe.nothing
else
Concurrent::Maybe.just(item)
end
end
def each
raise ArgumentError.new('no block given') unless block_given?
loop do
item, more = do_next
if item != Concurrent::NULL
yield(item)
elsif !more
break
end
end
end
class << self
def timer(seconds)
Channel.new(Buffer::Timer.new(seconds))
end
alias_method :after, :timer
def ticker(interval)
Channel.new(Buffer::Ticker.new(interval))
end
alias_method :tick, :ticker
def select(*args)
raise ArgumentError.new('no block given') unless block_given?
selector = Selector.new
yield(selector, *args)
selector.execute
end
alias_method :alt, :select
def go(*args, &block)
go_via(GOROUTINES, *args, &block)
end
def go_via(executor, *args, &block)
raise ArgumentError.new('no block given') unless block_given?
executor.post(*args, &block)
end
def go_loop(*args, &block)
go_loop_via(GOROUTINES, *args, &block)
end
def go_loop_via(executor, *args, &block)
raise ArgumentError.new('no block given') unless block_given?
executor.post(block, *args) do
loop do
break unless block.call(*args)
end
end
end
end
private
def validator
@validator
end
def validator=(value)
@validator = value
end
def buffer
@buffer
end
def buffer=(value)
@buffer = value
end
def validate(value, allow_nil, raise_error)
if !allow_nil && value.nil?
raise_error ? raise(ValidationError.new('nil is not a valid value')) : false
elsif !validator.call(value)
raise_error ? raise(ValidationError) : false
else
true
end
rescue => ex
# the validator raised an exception
return raise_error ? raise(ex) : false
end
def do_put(item)
buffer.put(item)
end
def do_offer(item)
buffer.offer(item)
end
def do_take
buffer.take
end
def do_next
buffer.next
end
def do_poll
buffer.poll
end
end
end
|