1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
module Concurrent
module Actor
module Behaviour
# Allows to pause actors on errors.
# When paused all arriving messages are collected and processed after the actor
# is resumed or reset. Resume will simply continue with next message.
# Reset also reinitialized context.
# @note TODO missing example
class Pausing < Abstract
def initialize(core, subsequent, core_options)
super core, subsequent, core_options
@paused = false
@deferred = []
end
def paused?
@paused
end
def on_envelope(envelope)
case envelope.message
when :pause!
pause!
when :paused?
paused?
when :resume!
resume!
when :reset!
reset!
when :restart!
restart!
else
if paused?
@deferred << envelope
MESSAGE_PROCESSED
else
pass envelope
end
end
end
def pause!(error = nil)
do_pause
broadcast true, error || :paused
true
end
def resume!
return false unless paused?
do_resume
broadcast(true, :resumed)
true
end
def reset!
return false unless paused?
broadcast(false, :resetting)
do_reset
broadcast(true, :reset)
true
end
def restart!
return false unless paused?
broadcast(false, :restarting)
do_restart
broadcast(true, :restarted)
true
end
def on_event(public, event)
event_name, _ = event
reject_deferred if event_name == :terminated
super public, event
end
private
def do_pause
@paused = true
nil
end
def do_resume
@paused = false
reschedule_deferred
nil
end
def do_reset
rebuild_context
do_resume
reschedule_deferred
nil
end
def do_restart
rebuild_context
reject_deferred
do_resume
nil
end
def rebuild_context
core.allocate_context
core.build_context
nil
end
def reschedule_deferred
@deferred.each { |envelope| core.schedule_execution { core.process_envelope envelope } }
@deferred.clear
end
def reject_deferred
@deferred.each { |envelope| reject_envelope envelope }
@deferred.clear
end
end
end
end
end
|