1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
|
# frozen_string_literal: true
require 'listen/monotonic_time'
module Listen
module Event
class Processor
def initialize(config, reasons)
@config = config
@listener = config.listener
@reasons = reasons
_reset_no_unprocessed_events
end
# TODO: implement this properly instead of checking the state at arbitrary
# points in time
def loop_for(latency)
@latency = latency
loop do
event = _wait_until_events
_check_stopped
_wait_until_events_calm_down
_wait_until_no_longer_paused
_process_changes(event)
end
rescue Stopped
Listen.logger.debug('Processing stopped')
end
private
class Stopped < RuntimeError
end
def _wait_until_events_calm_down
loop do
now = MonotonicTime.now
# Assure there's at least latency between callbacks to allow
# for accumulating changes
diff = _deadline - now
break if diff <= 0
# give events a bit of time to accumulate so they can be
# compressed/optimized
_sleep(diff)
end
end
def _wait_until_no_longer_paused
@listener.wait_for_state(*(Listener.states.keys - [:paused]))
end
def _check_stopped
if @listener.stopped?
_flush_wakeup_reasons
raise Stopped
end
end
def _sleep(seconds)
_check_stopped
config.sleep(seconds)
_check_stopped
_flush_wakeup_reasons do |reason|
if reason == :event && !@listener.paused?
_remember_time_of_first_unprocessed_event
end
end
end
def _remember_time_of_first_unprocessed_event
@_remember_time_of_first_unprocessed_event ||= MonotonicTime.now
end
def _reset_no_unprocessed_events
@_remember_time_of_first_unprocessed_event = nil
end
def _deadline
@_remember_time_of_first_unprocessed_event + @latency
end
# blocks until event is popped
# returns the event or `nil` when the event_queue is closed
def _wait_until_events
config.event_queue.pop.tap do |_event|
@_remember_time_of_first_unprocessed_event ||= MonotonicTime.now
end
end
def _flush_wakeup_reasons
until @reasons.empty?
reason = @reasons.pop
yield reason if block_given?
end
end
# for easier testing without sleep loop
def _process_changes(event)
_reset_no_unprocessed_events
changes = [event]
changes << config.event_queue.pop until config.event_queue.empty?
return unless config.callable?
hash = config.optimize_changes(changes)
result = [hash[:modified], hash[:added], hash[:removed]]
return if result.all?(&:empty?)
block_start = MonotonicTime.now
exception_note = " (exception)"
::Listen::Thread.rescue_and_log('_process_changes') do
config.call(*result)
exception_note = nil
end
Listen.logger.debug "Callback#{exception_note} took #{MonotonicTime.now - block_start} sec"
end
attr_reader :config
end
end
end
|