1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
|
require 'concurrent-edge'
# This little bit more complicated commented example aims to
# demonstrate some of the capabilities of concurrent-ruby new abstractions.
#
# It is a concurrent processing pipeline which on one side has several web crawlers.
# They are searching the web for data and filling buffer.
# On the other side there are data processors which are pop the data from buffer.
# They are processing the data and storing results into a DB
# which has limited concurrency level.
# Some of the parts like Web and DB are just stubs.
# Each part logs and increments counters to keep some stats about the pipeline.
# There is also a periodical readout of the stats into log scheduled.
#
# Schema of the pipeline:
#
# web-crawlers -> buffer -> data-processing -> DB
# \____________________________\_____\___> logging
# TODO (pitr-ch 10-Mar-2019): replace with a better more realistic example using
# * actors for limited concurrency with state - local DB connection
# * throttled futures for REST API - limiting server load
# The central logger is defined first.
# It has state like the logger instance, therefore the actor is used.
# It is better to exactly define the communication protocol of the logging actor.
# It will only understand these messages.
Log = Concurrent::ImmutableStruct.new :severity, :message
SetLevel = Concurrent::ImmutableStruct.new :level
require 'logger'
require 'stringio'
# Including actor constants so this scope understands ANY etc.
include Concurrent::ErlangActor::EnvironmentConstants
# The logger does not need a dedicated thread, let's use a pool.
LOGGING = Concurrent::ErlangActor.spawn Logger::FATAL,
type: :on_pool,
name: 'logger' do |level|
# a Logger instance with nicer formatting is created
@logger = Logger.new($captured_out)
@logger.level = level
@logger.formatter = lambda do |severity, datetime, progname, msg|
formatted_message = case msg
when String
msg
when Exception
format "%s (%s)\n%s",
msg.message, msg.class, (msg.backtrace || []).join("\n")
else
msg.inspect
end
format "[%s] %5s -- %s: %s\n",
datetime.strftime('%Y-%m-%d %H:%M:%S.%L'),
severity,
progname,
formatted_message
end
# definition of the logging actor behaviour
receive(
# log messages
on(Log) { |message| @logger.log message.severity, message.message },
# change level
on(SetLevel) { |message| @logger.level = message.level },
# It is a good practice to read and log bad messages,
# otherwise they would accumulate in the inbox.
on(ANY) { |message| @logger.error bad_message: message },
# The logger has static behaviour, therefore keep can be used, and the actor
# will behave the same with each message received as defined below.
keep: true)
end
# testing the logger works as expected
LOGGING.tell Log[Logger::FATAL, :tornado]
LOGGING.tell Log[Logger::INFO, :wind]
LOGGING.tell SetLevel[Logger::DEBUG]
LOGGING.tell Log[Logger::INFO, :breeze]
sleep 0.05 # the logging is asynchronous, we need to wait a bit until it's written
get_captured_output
# the logging could be wrapped in a method
def log(severity, message)
LOGGING.tell Log[severity, message]
true
end
include Logger::Severity
log INFO, 'alive'
sleep 0.05
get_captured_output
# The stub which will represent the web
module Web
@counter = Concurrent::AtomicFixnum.new
def self.search
sleep 0.01
@counter.increment.to_s(16)
end
end #
# The cancellation which will be used to cancel the whole processing pipeline.
@cancellation, origin = Concurrent::Cancellation.new
# Buffer for work
buffer_capacity = 10
@buffer = Concurrent::Promises::Channel.new buffer_capacity
web_crawler_count = 4
# Track the number of data provided by each crawler
crawler_data_counter = Array.new(web_crawler_count) do |i|
# this is accessed by multiple threads so it should be a tread-safe counter
Concurrent::AtomicFixnum.new
end #
# the array is frozen which makes it immutable,
# therefore safe to use when concurrently accessed.
# Otherwise if it was being modified it wound has to be Concurrent::Array to make it safe.
crawler_data_counter.freeze
# The web crawlers are defined directly with threads to start the example simply.
# They search the web and immediately as they find something they push
# the data into the buffer.
# The push will block if the buffer is full,
# regulating how fast is the work being found.
# This is called backpressure.
crawlers = Array.new web_crawler_count do |i|
Thread.new do
while true
# crawl the web until cancelled
break if @cancellation.canceled?
# will block and slow down the crawler if the buffer is full
data = Web.search
until @buffer.push data, 0.1
# It is a good practice to use timeouts on all blocking operations
# If the pipeline is cancelled and the data-processors finish
# before taking data from buffer a crawler could get stack on this push.
break if @cancellation.canceled?
end
# it pushed data, increment its counter
crawler_data_counter[i].increment
log DEBUG, "crawler #{i} found #{data}"
end
end
end.freeze
# So far only the crawlers looking for data are defined
# pushing data into the buffer.
# The data processing definition follows.
# Threads are not used again directly but rather the data processing
# is defined using Futures.
# Even though that makes the definition more complicated
# it has a big advantage that data processors will not require a Thread each
# but they will share and run on a Thread pool.
# That removes an important limitation of the total number of threads process can have,
# which can be an issue in larger systems.
# This example would be fine with using the Threads
# however it would not demonstrate the more advanced usage then.
# The data processing stores results in a DB,
# therefore the stub definition of a database precedes the data processing.
module DB
@data = Concurrent::Map.new
# increment a counter for char
def self.add(char, count)
@data.compute char do |old|
(old || 0) + count
end
true
end
# return the stored data as Hash
def self.data
@data.each_pair.reduce({}) { |h, (k, v)| h.update k => v }
end
end
# Lets assume that instead having this DB
# we have limited number of connections
# and therefore there is a limit on
# how many threads can communicate with the DB at the same time.
# The throttle is created to limit the number of concurrent access to DB.
@db_throttle = Concurrent::Throttle.new 4
# The data processing definition follows
data_processing_count = 20 # this could actually be thousands if required
# track the number of data received by data processors
@data_processing_counters = Array.new data_processing_count do
Concurrent::AtomicFixnum.new
end.freeze
def data_processing(i)
# pop_op returns a future which is fulfilled with a message from buffer
# when a message is valuable.
@buffer.pop_op.then_on(:fast) do |data|
# then we process the message on :fast pool since this has no blocking
log DEBUG, "data-processor #{i} got #{data}"
@data_processing_counters[i].increment
sleep 0.1 # simulate it actually doing something which take some time
# find the most frequent char
data.chars.
group_by { |v| v }.
map { |ch, arr| [ch, arr.size] }.
max_by { |ch, size| size }
end.then_on(@db_throttle.on(:io)) do |char, count|
# the db access has to be limited therefore the db_throttle is used
# DBs use io therefore this part is executed on global thread pool wor :io
DB.add char, count
end.then_on(:fast) do |_|
# last section executes back on :fast executor
# checks if it was cancelled
# if not then it calls itself recursively
# which in combination with #run will turn this into infinite data processing
# (until cancelled)
# The #run will keep flatting to the inner future as long the value is a future.
if @cancellation.canceled?
# return something else then future, #run will stop executing
:done
else
# continue running with a future returned by data_processing
data_processing i
end
end
end #
# create the data processors
data_processors = Array.new data_processing_count do |i|
data_processing(i).run
end
# Some statics are collected in crawler_data_counter
# and @data_processing_counters.
# Schedule a periodical readout to a log.
def readout(crawler_data_counter)
# schedule readout in 0.4 sec or on cancellation
(@cancellation.origin | Concurrent::Promises.schedule(0.4)).then do
log INFO,
"\ncrawlers found: #{crawler_data_counter.map(&:value).join(', ')}\n" +
"data processors consumed: #{@data_processing_counters.map(&:value).join(', ')}"
end.then do
# reschedule if not cancelled
readout crawler_data_counter unless @cancellation.canceled?
end
end
# start the periodical readouts
readouts = readout(crawler_data_counter).run
sleep 2 # let the whole processing pipeline work
# cancel everything
origin.resolve
# wait for everything to stop
crawlers.each(&:join)
data_processors.each(&:wait!)[0..10]
readouts.wait!
# terminate the logger
Concurrent::ErlangActor.terminate LOGGING, :cancelled
LOGGING.terminated.wait
# inspect collected char frequencies
DB.data
# see the logger output
get_captured_output
|