1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
|
# frozen_string_literal: true
module Lumberjack
# A buffered logging device that wraps another logging device. Entries are buffered in memory
# until the buffer size is reached or the device is flushed.
#
# @example Create a buffered device that flushes every 5 entries
# device = Lumberjack::Device::Buffer.new(Lumberjack::Device::LogFile.new("logfile.log"), buffer_size: 5)
#
# @example Create a buffered device that automatically flushes every 10 seconds
# device = Lumberjack::Device::Buffer.new("/var/log/app.log", buffer_size: 10, flush_seconds: 10)
#
# @example Create a buffered device with a before_flush callback
# before_flush = -> { puts "Flushing log buffer" }
# device = Lumberjack::Device::Buffer.new(device, buffer_size: 10, before_flush: before_flush)
class Device::Buffer < Device
# Internal class that manages the entry buffer and flushing logic.
class EntryBuffer
attr_accessor :size
attr_reader :device, :last_flushed_at
def initialize(device, size, before_flush)
@device = device
@size = size
@before_flush = before_flush if before_flush.respond_to?(:call)
@lock = Mutex.new
@entries = []
@last_flushed_at = Time.now
@closed = false
end
def <<(entry)
return if closed?
@lock.synchronize do
@entries << entry
end
flush if @entries.size >= @size
end
def flush
entries = nil
if closed?
@before_flush&.call
entries = @entries
@entries = []
else
@lock.synchronize do
@before_flush&.call
entries = @entries
@entries = []
end
end
@last_flushed_at = Time.now
return if entries.nil?
entries.each do |entry|
@device.write(entry)
rescue => e
warn("Error writing log entry from buffer: #{e.inspect}")
end
end
def close
@closed = true
flush
end
def closed?
@closed
end
def reopen
@closed = false
end
def empty?
@entries.empty?
end
end
class << self
private
def create_finalizer(buffer) # :nodoc:
lambda { |object_id| buffer.close }
end
def create_flusher_thread(flush_seconds, buffer) # :nodoc:
Thread.new do
until buffer.closed?
sleep(flush_seconds)
buffer.flush if Time.now - buffer.last_flushed_at >= flush_seconds
end
end
end
end
# Initialize a new buffered logging device that wraps another device.
#
# @param wrapped_device [Lumberjack::Device, String, Symbol, IO] The underlying device to wrap.
# This can be any valid device specification that +Lumberjack::Device.open_device+ accepts.
# Options not related to buffering will be passed to the underlying device constructor.
# @param options [Hash] Options for the buffer and the underlying device.
# @option options [Integer] :buffer_size The number of entries to buffer before flushing. Default is 0 (no buffering).
# @option options [Integer] :flush_seconds If specified, a background thread will flush the buffer every N seconds.
# @option options [Proc] :before_flush A callback that will be called before each flush. The callback should
# respond to +call+ and take no arguments.
def initialize(wrapped_device, options = {})
buffer_options = [:buffer_size, :flush_seconds, :before_flush]
device_options = options.reject { |k, _| buffer_options.include?(k) }
device = Device.open_device(wrapped_device, device_options)
@buffer = EntryBuffer.new(device, options[:buffer_size] || 0, options[:before_flush])
flush_seconds = options[:flush_seconds]
self.class.send(:create_flusher_thread, flush_seconds, @buffer) if flush_seconds.is_a?(Numeric) && flush_seconds > 0
# Add a finalizer to ensure flush is called before the object is destroyed
ObjectSpace.define_finalizer(self, self.class.send(:create_finalizer, @buffer))
end
def buffer_size
@buffer.size
end
# Set the buffer size. The underlying device will only be written to when the buffer size
# is exceeded.
#
# @param [Integer] value The size of the buffer in bytes.
# @return [void]
def buffer_size=(value)
@buffer.size = value
@buffer.flush
end
# Write an entry to the underlying device.
#
# @param [LogEntry, String] entry The entry to write.
# @return [void]
def write(entry)
@buffer << entry
end
# Close the device.
#
# @return [void]
def close
@buffer.close
@buffer.device.close
# Remove the finalizer since we've already flushed
ObjectSpace.undefine_finalizer(self)
end
# Return true if the buffer has been closed.
def closed?
@buffer.closed?
end
# Flush the buffer to the underlying device.
#
# @return [void]
def flush
@buffer.flush
end
# Reopen the underlying device, optionally with a new log destination.
def reopen(logdev = nil)
flush
@buffer.device.reopen(logdev)
@buffer.reopen
ObjectSpace.define_finalizer(self, self.class.send(:create_finalizer, @buffer))
end
# Return the underlying stream. Provided for API compatibility with Logger devices.
#
# @return [IO] The underlying stream.
def dev
@buffer.device.dev
end
# @api private
def last_flushed_at
@buffer.last_flushed_at
end
# @api private
def empty?
@buffer.empty?
end
private
def create_flusher_thread(flush_seconds, buffer) # :nodoc:
Thread.new do
until buffer.closed?
sleep(flush_seconds)
buffer.flush if Time.now - buffer.last_flushed_at >= flush_seconds
end
end
end
end
end
|