1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
|
# frozen_string_literal: true
# Released under the MIT License.
# Copyright, 2019-2022, by Samuel Williams.
# Copyright, 2020, by Simon Perepelitsa.
require 'console/logger'
require 'async'
require 'async/notification'
require 'async/semaphore'
module Async
module Pool
class Controller
def self.wrap(**options, &block)
self.new(block, **options)
end
def initialize(constructor, limit: nil, concurrency: nil)
# All available resources:
@resources = {}
# Resources which may be available to be acquired:
# This list may contain false positives, or resources which were okay but have since entered a state which is unusuable.
@available = []
@notification = Async::Notification.new
@limit = limit
@constructor = constructor
# Set the concurrency to be the same as the limit for maximum performance:
if limit
concurrency ||= limit
else
concurrency ||= 1
end
@guard = Async::Semaphore.new(concurrency)
@gardener = nil
end
# @attribute [Hash(Resource, Integer)] all allocated resources, and their associated usage.
attr :resources
def size
@resources.size
end
# Whether the pool has any active resources.
def active?
!@resources.empty?
end
# Whether there are resources which are currently in use.
def busy?
@resources.collect do |_, usage|
return true if usage > 0
end
return false
end
# Whether there are available resources, i.e. whether {#acquire} can reuse an existing resource.
def available?
@available.any?
end
# Wait until a pool resource has been freed.
def wait
@notification.wait
end
def empty?
@resources.empty?
end
def acquire
resource = wait_for_resource
return resource unless block_given?
begin
yield resource
ensure
release(resource)
end
end
# Make the resource resources and let waiting tasks know that there is something resources.
def release(resource)
processed = false
# A resource that is not good should also not be reusable.
if resource.reusable?
processed = reuse(resource)
end
ensure
retire(resource) unless processed
end
def close
@available.clear
while pair = @resources.shift
resource, usage = pair
if usage > 0
Console.logger.warn(self, resource: resource, usage: usage) {"Closing resource while still in use!"}
end
resource.close
end
@gardener&.stop
end
def to_s
if @resources.empty?
"\#<#{self.class}(#{usage_string})>"
else
"\#<#{self.class}(#{usage_string}) #{availability_string}>"
end
end
# Retire (and close) all unused resources. If a block is provided, it should implement the desired functionality for unused resources.
# @param retain [Integer] the minimum number of resources to retain.
# @yield resource [Resource] unused resources.
def prune(retain = 0)
unused = []
# This code must not context switch:
@resources.each do |resource, usage|
if usage.zero?
unused << resource
end
end
# It's okay for this to context switch:
unused.each do |resource|
if block_given?
yield resource
else
retire(resource)
end
break if @resources.size <= retain
end
# Update availability list:
@available.clear
@resources.each do |resource, usage|
if usage < resource.concurrency and resource.reusable?
@available << resource
end
end
return unused.size
end
def retire(resource)
Console.logger.debug(self) {"Retire #{resource}"}
@resources.delete(resource)
resource.close
@notification.signal
return true
end
protected
def start_gardener
return if @gardener
Async(transient: true, annotation: "#{self.class} Gardener") do |task|
@gardener = task
Task.yield
ensure
@gardener = nil
self.close
end
end
def usage_string
"#{@resources.size}/#{@limit || '∞'}"
end
def availability_string
@resources.collect do |resource,usage|
"#{usage}/#{resource.concurrency}#{resource.viable? ? nil : '*'}/#{resource.count}"
end.join(";")
end
def usage
@resources.count{|resource, usage| usage > 0}
end
def free
@resources.count{|resource, usage| usage == 0}
end
def reuse(resource)
Console.logger.debug(self) {"Reuse #{resource}"}
usage = @resources[resource]
if usage.zero?
raise "Trying to reuse unacquired resource: #{resource}!"
end
# If the resource was fully utilized, it now becomes available:
if usage == resource.concurrency
@available.push(resource)
end
@resources[resource] = usage - 1
@notification.signal
return true
end
def wait_for_resource
# If we fail to create a resource (below), we will end up waiting for one to become resources.
until resource = available_resource
@notification.wait
end
Console.logger.debug(self) {"Wait for resource -> #{resource}"}
# if resource.concurrency > 1
# @notification.signal
# end
return resource
end
# @returns [Object] A new resource in a "used" state.
def create_resource
self.start_gardener
# This might return nil, which means creating the resource failed.
if resource = @constructor.call
@resources[resource] = 1
# Make the resource available if it can be used multiple times:
if resource.concurrency > 1
@available.push(resource)
end
end
return resource
end
# @returns [Object] An existing resource in a "used" state.
def available_resource
resource = nil
@guard.acquire do
resource = get_resource
end
return resource
rescue Exception
reuse(resource) if resource
raise
end
private def get_resource
while resource = @available.last
if usage = @resources[resource] and usage < resource.concurrency
if resource.viable?
usage = (@resources[resource] += 1)
if usage == resource.concurrency
# The resource is used up to it's limit:
@available.pop
end
return resource
else
retire(resource)
@available.pop
end
else
# The resource has been removed already, so skip it and remove it from the availability list.
@available.pop
end
end
if @limit.nil? or @resources.size < @limit
Console.logger.debug(self) {"No available resources, allocating new one..."}
return create_resource
end
end
end
end
end
|