Class: Concurrent::Actor::Behaviour::Buffer
- Inherits:
-
Abstract
- Object
- Abstract
- Concurrent::Actor::Behaviour::Buffer
- Defined in:
- lib/concurrent/actor/behaviour/buffer.rb
Overview
Any message reaching this behaviour is buffered. Only one message is is
scheduled at any given time. Others are kept in buffer until another one
can be scheduled. This effectively means that messages handled by
behaviours before buffer have higher priority and they can be processed
before messages arriving into buffer. This allows for the processing of
internal actor messages like (:link
, :supervise
) first.
Instance Attribute Summary (collapse)
- - (undocumented) core inherited from Abstract readonly
- - (undocumented) subsequent inherited from Abstract readonly
Instance Method Summary (collapse)
-
- (Buffer) initialize(core, subsequent, core_options)
constructor
A new instance of Buffer.
- - (undocumented) on_envelope(envelope)
- - (undocumented) on_event(public, event)
- - (undocumented) process_envelope
-
- (Boolean) process_envelopes?
Ensures that only one envelope processing is scheduled with #schedule_execution, this allows other scheduled blocks to be executed before next envelope processing.
Constructor Details
- (Buffer) initialize(core, subsequent, core_options)
Returns a new instance of Buffer
12 13 14 15 16 |
# File 'lib/concurrent/actor/behaviour/buffer.rb', line 12 def initialize(core, subsequent, ) super core, subsequent, @buffer = [] @receive_envelope_scheduled = false end |
Instance Attribute Details
- (undocumented) core (readonly) Originally defined in class Abstract
- (undocumented) subsequent (readonly) Originally defined in class Abstract
Instance Method Details
- (undocumented) on_envelope(envelope)
18 19 20 21 22 |
# File 'lib/concurrent/actor/behaviour/buffer.rb', line 18 def on_envelope(envelope) @buffer.push envelope process_envelopes? MESSAGE_PROCESSED end |
- (undocumented) on_event(public, event)
44 45 46 47 48 49 50 51 52 |
# File 'lib/concurrent/actor/behaviour/buffer.rb', line 44 def on_event(public, event) event_name, _ = event case event_name when :terminated, :restarted @buffer.each { |envelope| reject_envelope envelope } @buffer.clear end super public, event_name end |
- (undocumented) process_envelope
35 36 37 38 39 40 41 42 |
# File 'lib/concurrent/actor/behaviour/buffer.rb', line 35 def process_envelope envelope = @buffer.shift return nil unless envelope pass envelope ensure @receive_envelope_scheduled = false core.schedule_execution { process_envelopes? } end |
- (Boolean) process_envelopes?
Ensures that only one envelope processing is scheduled with #schedule_execution, this allows other scheduled blocks to be executed before next envelope processing. Simply put this ensures that Core is still responsive to internal calls (like add_child) even though the Actor is flooded with messages.
28 29 30 31 32 33 |
# File 'lib/concurrent/actor/behaviour/buffer.rb', line 28 def process_envelopes? unless @buffer.empty? || @receive_envelope_scheduled @receive_envelope_scheduled = true process_envelope end end |